|
|
|
@ -23,13 +23,15 @@ typedef struct SBlockName {
|
|
|
|
|
char parTbName[TSDB_TABLE_NAME_LEN];
|
|
|
|
|
} SBlockName;
|
|
|
|
|
|
|
|
|
|
static void doRetryDispatchData(void* param, void* tmrId);
|
|
|
|
|
static void doMonitorDispatchData(void* param, void* tmrId);
|
|
|
|
|
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
|
|
|
|
|
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
|
|
|
|
|
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
|
|
|
|
int32_t vgSz, int64_t groupId);
|
|
|
|
|
int64_t groupId, int64_t now);
|
|
|
|
|
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
|
|
|
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
|
|
|
|
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
|
|
|
|
|
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
|
|
|
|
|
|
|
|
|
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
|
|
|
|
pMsg->msgType = msgType;
|
|
|
|
@ -42,7 +44,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
|
|
|
|
pReq->streamId = pTask->id.streamId;
|
|
|
|
|
pReq->srcVgId = vgId;
|
|
|
|
|
pReq->stage = pTask->pMeta->stage;
|
|
|
|
|
pReq->msgId = pTask->execInfo.dispatch;
|
|
|
|
|
pReq->msgId = pTask->msgInfo.msgId;
|
|
|
|
|
pReq->upstreamTaskId = pTask->id.taskId;
|
|
|
|
|
pReq->upstreamChildId = pTask->info.selfChildId;
|
|
|
|
|
pReq->upstreamNodeId = pTask->info.nodeId;
|
|
|
|
@ -65,6 +67,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
|
|
|
|
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
|
|
|
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
|
|
|
|
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
|
|
|
|
|
|
|
|
|
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
|
|
|
pCont->streamId = pReq->streamId;
|
|
|
|
|
pCont->rspToTaskId = pReq->srcTaskId;
|
|
|
|
@ -216,26 +219,66 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
|
|
|
|
|
taosMemoryFree(pReq);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
|
|
|
|
|
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
|
|
|
|
|
? 1
|
|
|
|
|
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
|
|
|
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
|
|
|
|
if (pMsgInfo->pData != NULL) {
|
|
|
|
|
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
|
|
|
|
|
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pMsgInfo->checkpointId = -1;
|
|
|
|
|
pMsgInfo->transId = -1;
|
|
|
|
|
pMsgInfo->pData = NULL;
|
|
|
|
|
pMsgInfo->dispatchMsgType = 0;
|
|
|
|
|
|
|
|
|
|
taosThreadMutexLock(&pMsgInfo->lock);
|
|
|
|
|
taosArrayClear(pTask->msgInfo.pSendInfo);
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
int32_t type = pTask->outputInfo.type;
|
|
|
|
|
int32_t num = streamTaskGetNumOfDownstream(pTask);
|
|
|
|
|
|
|
|
|
|
ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH);
|
|
|
|
|
|
|
|
|
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq));
|
|
|
|
|
if (pReqs == NULL) {
|
|
|
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
|
|
|
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
|
|
|
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
|
|
|
|
terrno = code;
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
|
|
|
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
|
|
|
|
|
|
|
|
code = tInitStreamDispatchReq(pReqs, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
taosMemoryFree(pReqs);
|
|
|
|
|
terrno = code;
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pReqs;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
int64_t now = taosGetTimestampMs();
|
|
|
|
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
|
|
|
|
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
|
|
|
|
|
|
|
|
|
@ -247,48 +290,28 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|
|
|
|
pTask->msgInfo.transId = p->info.window.ekey;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SStreamDispatchReq* pReqs = createDispatchDataReq(pTask, pData);
|
|
|
|
|
if (pReqs == NULL) {
|
|
|
|
|
stError("s-task:%s failed to create dispatch req", pTask->id.idStr);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
|
|
|
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
|
|
|
|
|
|
|
|
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
|
|
|
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
taosMemoryFree(pReq);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
|
|
|
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
|
|
|
|
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq);
|
|
|
|
|
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
destroyDispatchMsg(pReq, 1);
|
|
|
|
|
destroyDispatchMsg(pReqs, 1);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pTask->msgInfo.pData = pReq;
|
|
|
|
|
pTask->msgInfo.pData = pReqs;
|
|
|
|
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
|
|
|
int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
|
|
|
|
|
ASSERT(rspCnt == 0);
|
|
|
|
|
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
|
|
|
|
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
|
|
|
|
|
if (pReqs == NULL) {
|
|
|
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
|
|
|
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
|
|
|
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
|
|
|
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
|
|
|
|
|
|
|
|
@ -304,7 +327,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|
|
|
|
|
|
|
|
|
// it's a new vnode to receive dispatch msg, so add one
|
|
|
|
|
if (pReqs[j].blockNum == 0) {
|
|
|
|
|
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
|
|
|
|
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
|
|
|
|
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
|
|
|
|
|
|
|
|
|
|
taosThreadMutexLock(&pTask->msgInfo.lock);
|
|
|
|
|
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->msgInfo.lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pReqs[j].blockNum++;
|
|
|
|
@ -313,7 +341,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
|
|
|
|
|
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, pDataBlock->info.id.groupId, now);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
|
|
|
|
return code;
|
|
|
|
@ -327,9 +355,9 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|
|
|
|
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr,
|
|
|
|
|
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData);
|
|
|
|
|
} else {
|
|
|
|
|
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
|
|
|
|
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr,
|
|
|
|
|
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt,
|
|
|
|
|
pTask->msgInfo.pData);
|
|
|
|
|
pTask->execInfo.dispatch, pTask->pMeta->stage, numOfBranches, pTask->msgInfo.pData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
@ -337,8 +365,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|
|
|
|
|
|
|
|
|
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
int32_t msgId = pTask->execInfo.dispatch;
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
int32_t msgId = pTask->msgInfo.msgId;
|
|
|
|
|
|
|
|
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
|
|
|
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
|
|
@ -352,10 +380,10 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|
|
|
|
} else {
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
|
|
|
|
|
|
|
|
|
int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt;
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id,
|
|
|
|
|
pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId);
|
|
|
|
|
pTask->info.selfChildId, numOfBranches, numOfVgroups, msgId);
|
|
|
|
|
|
|
|
|
|
int32_t numOfSend = 0;
|
|
|
|
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
|
|
|
@ -370,7 +398,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// no need to try remain, all already send.
|
|
|
|
|
if (++numOfSend == actualVgroups) {
|
|
|
|
|
if (++numOfSend == numOfBranches) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -382,102 +410,154 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doRetryDispatchData(void* param, void* tmrId) {
|
|
|
|
|
SStreamTask* pTask = param;
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
int32_t msgId = pTask->execInfo.dispatch;
|
|
|
|
|
static void setNotInDispatchMonitor(SDispatchMsgInfo* pMsgInfo) {
|
|
|
|
|
taosThreadMutexLock(&pMsgInfo->lock);
|
|
|
|
|
pMsgInfo->inMonitor = 0;
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
|
|
|
|
|
pEntry->sendTs = now;
|
|
|
|
|
pEntry->rspTs = -1;
|
|
|
|
|
pEntry->retryCount += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
|
|
|
|
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
|
|
|
|
|
|
|
|
|
int32_t msgId = pTask->msgInfo.msgId;
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
|
|
|
|
|
setResendInfo(pEntry, now);
|
|
|
|
|
for (int32_t j = 0; j < numOfVgroups; ++j) {
|
|
|
|
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
|
|
|
|
if (pVgInfo->vgId == pEntry->nodeId) {
|
|
|
|
|
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
|
|
|
|
|
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doMonitorDispatchData(void* param, void* tmrId) {
|
|
|
|
|
SStreamTask* pTask = param;
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
|
|
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
|
|
|
|
int32_t msgId = pMsgInfo->msgId;
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
int64_t now = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s start monitor dispatch data", id);
|
|
|
|
|
|
|
|
|
|
if (streamTaskShouldStop(pTask)) {
|
|
|
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
|
|
|
|
setNotInDispatchMonitor(pMsgInfo);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
|
|
|
|
// slave task not handle the dispatch, downstream not ready will break the monitor timer
|
|
|
|
|
// follower not handle the dispatch rsp
|
|
|
|
|
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
|
|
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref);
|
|
|
|
|
setNotInDispatchMonitor(pMsgInfo);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
taosThreadMutexLock(&pMsgInfo->lock);
|
|
|
|
|
if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) {
|
|
|
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
|
|
|
|
|
|
|
|
|
|
pTask->msgInfo.inMonitor = 0;
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
|
|
|
|
|
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
|
|
|
|
|
if (numOfFailed == 0) {
|
|
|
|
|
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
|
|
|
|
|
taosArrayClear(pTask->msgInfo.pRetryList);
|
|
|
|
|
|
|
|
|
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
|
|
|
|
|
|
|
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
|
|
|
|
|
pTask->info.selfChildId, msgId);
|
|
|
|
|
|
|
|
|
|
int32_t numOfFailed = taosArrayGetSize(pList);
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
|
|
|
|
|
pTask->info.selfChildId, numOfFailed, msgId);
|
|
|
|
|
int32_t numOfRetry = 0;
|
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
|
|
|
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
|
|
|
|
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfFailed; i++) {
|
|
|
|
|
int32_t vgId = *(int32_t*)taosArrayGet(pList, i);
|
|
|
|
|
// downstream not rsp yet beyond threshold that is 10s
|
|
|
|
|
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
|
|
|
|
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
|
|
|
|
numOfRetry += 1;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t j = 0; j < numOfVgroups; ++j) {
|
|
|
|
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
|
|
|
|
if (pVgInfo->vgId == vgId) {
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
|
|
|
|
pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId);
|
|
|
|
|
// downstream inputQ is closed
|
|
|
|
|
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
|
|
|
|
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
|
|
|
|
numOfRetry += 1;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// handle other errors
|
|
|
|
|
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
|
|
|
|
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
|
|
|
|
numOfRetry += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
|
|
|
|
numOfFailed, msgId);
|
|
|
|
|
numOfRetry, msgId);
|
|
|
|
|
} else {
|
|
|
|
|
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
|
|
|
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
|
|
|
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
|
|
|
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id,
|
|
|
|
|
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
|
|
|
|
ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1);
|
|
|
|
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
|
|
|
|
|
|
|
|
|
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);
|
|
|
|
|
setResendInfo(pEntry, now);
|
|
|
|
|
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
|
|
|
|
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pList);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (!streamTaskShouldStop(pTask)) {
|
|
|
|
|
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
|
|
|
|
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
|
|
|
|
if (streamTaskShouldPause(pTask)) {
|
|
|
|
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
|
|
|
|
} else {
|
|
|
|
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (streamTaskShouldStop(pTask)) {
|
|
|
|
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
|
|
|
|
pTask->msgInfo.retryCount++;
|
|
|
|
|
|
|
|
|
|
stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
|
|
|
|
|
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
|
|
|
|
|
|
|
|
|
|
if (pTask->msgInfo.pRetryTmr != NULL) {
|
|
|
|
|
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
|
|
|
|
|
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
|
|
|
|
setNotInDispatchMonitor(pMsgInfo);
|
|
|
|
|
} else {
|
|
|
|
|
pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer);
|
|
|
|
|
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
|
|
|
|
int64_t groupId) {
|
|
|
|
|
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
|
|
|
|
if (pTask->msgInfo.pRetryTmr != NULL) {
|
|
|
|
|
taosTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
|
|
|
|
|
} else {
|
|
|
|
|
pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
|
|
|
|
int64_t groupId, int64_t now) {
|
|
|
|
|
uint32_t hashValue = 0;
|
|
|
|
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
|
|
|
if (pTask->pNameMap == NULL) {
|
|
|
|
@ -495,23 +575,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|
|
|
|
} else {
|
|
|
|
|
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
|
|
|
|
if (pDataBlock->info.parTbName[0]) {
|
|
|
|
|
if(pTask->subtableWithoutMd5 != 1 &&
|
|
|
|
|
!isAutoTableName(pDataBlock->info.parTbName) &&
|
|
|
|
|
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) &&
|
|
|
|
|
groupId != 0){
|
|
|
|
|
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
|
|
|
|
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) &&
|
|
|
|
|
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
|
|
|
|
|
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
|
|
|
|
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
|
|
|
|
|
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
|
|
|
|
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
|
|
|
|
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
|
|
|
|
}
|
|
|
|
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
|
|
|
|
|
|
|
|
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
|
|
|
|
pDataBlock->info.parTbName);
|
|
|
|
|
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
|
|
|
|
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
|
|
|
|
hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
|
|
|
|
hashValue =
|
|
|
|
|
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
|
|
|
|
SBlockName bln = {0};
|
|
|
|
|
bln.hashValue = hashValue;
|
|
|
|
|
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
|
|
|
@ -520,20 +601,25 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool found = false;
|
|
|
|
|
bool found = false;
|
|
|
|
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
|
|
|
|
|
|
|
|
|
// TODO: optimize search
|
|
|
|
|
int32_t j;
|
|
|
|
|
for (j = 0; j < vgSz; j++) {
|
|
|
|
|
taosThreadMutexLock(&pTask->msgInfo.lock);
|
|
|
|
|
|
|
|
|
|
for (int32_t j = 0; j < numOfVgroups; j++) {
|
|
|
|
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
|
|
|
|
ASSERT(pVgInfo->vgId > 0);
|
|
|
|
|
|
|
|
|
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
|
|
|
|
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
|
|
|
|
taosThreadMutexUnlock(&pTask->msgInfo.lock);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pReqs[j].blockNum == 0) {
|
|
|
|
|
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
|
|
|
|
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
|
|
|
|
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
|
|
|
|
|
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pReqs[j].blockNum++;
|
|
|
|
@ -541,10 +627,28 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->msgInfo.lock);
|
|
|
|
|
ASSERT(found);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
|
|
|
|
|
pInfo->startTs = taosGetTimestampMs();
|
|
|
|
|
pInfo->rspTs = -1;
|
|
|
|
|
pInfo->msgId = msgId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
|
|
|
|
|
pInfo->startTs = -1;
|
|
|
|
|
pInfo->msgId = -1;
|
|
|
|
|
pInfo->rspTs = -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) {
|
|
|
|
|
pInfo->rspTs = recvTs;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|
|
|
|
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
|
|
|
|
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
|
|
|
@ -587,7 +691,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|
|
|
|
type == STREAM_INPUT__TRANS_STATE);
|
|
|
|
|
|
|
|
|
|
pTask->execInfo.dispatch += 1;
|
|
|
|
|
pTask->msgInfo.startTs = taosGetTimestampMs();
|
|
|
|
|
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
|
|
|
|
|
|
|
|
|
|
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
|
|
|
|
if (code == 0) {
|
|
|
|
@ -599,34 +703,21 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|
|
|
|
streamTaskInitTriggerDispatchInfo(pTask);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t retryCount = 0;
|
|
|
|
|
while (1) {
|
|
|
|
|
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
|
|
|
|
|
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount);
|
|
|
|
|
|
|
|
|
|
// todo deal with only partially success dispatch case
|
|
|
|
|
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
|
|
|
|
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
|
|
|
|
clearBufferedDispatchMsg(pTask);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
|
|
|
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug(
|
|
|
|
|
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
|
|
|
|
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
|
|
|
|
|
|
|
|
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
taosThreadMutexLock(&pTask->msgInfo.lock);
|
|
|
|
|
if (pTask->msgInfo.inMonitor == 0) {
|
|
|
|
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
pTask->msgInfo.inMonitor = 1;
|
|
|
|
|
} else {
|
|
|
|
|
stDebug("s-task:%s already in dispatch monitor tmr", id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->msgInfo.lock);
|
|
|
|
|
|
|
|
|
|
// this block can not be deleted until it has been sent to downstream task successfully.
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -817,8 +908,10 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|
|
|
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
|
|
|
|
ASSERT(dataStrLen > 0);
|
|
|
|
|
|
|
|
|
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
|
|
|
if (buf == NULL) return -1;
|
|
|
|
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
|
|
|
|
if (buf == NULL) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
|
|
|
|
pRetrieve->useconds = 0;
|
|
|
|
@ -1031,23 +1124,6 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|
|
|
|
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
|
|
|
|
|
|
|
|
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
|
|
|
|
if (delayDispatch) {
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
// we only set the dispatch msg info for current checkpoint trans
|
|
|
|
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
|
|
|
|
|
pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) {
|
|
|
|
|
ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId);
|
|
|
|
|
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
|
|
|
|
|
pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
|
|
|
|
|
|
|
|
|
|
streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId);
|
|
|
|
|
} else {
|
|
|
|
|
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
|
|
|
|
|
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
|
|
|
|
|
}
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
clearBufferedDispatchMsg(pTask);
|
|
|
|
|
|
|
|
|
|
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
|
|
|
@ -1074,17 +1150,55 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
|
|
|
int32_t msgId = pTask->execInfo.dispatch;
|
|
|
|
|
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
|
|
|
|
|
int32_t numOfRsp = 0;
|
|
|
|
|
bool alreadySet = false;
|
|
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
// for test purpose, build the failure case
|
|
|
|
|
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
|
|
|
|
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED;
|
|
|
|
|
taosThreadMutexLock(&pMsgInfo->lock);
|
|
|
|
|
for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
|
|
|
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
|
|
|
|
if (pEntry->nodeId == vgId) {
|
|
|
|
|
ASSERT(!alreadySet);
|
|
|
|
|
pEntry->rspTs = now;
|
|
|
|
|
pEntry->status = code;
|
|
|
|
|
alreadySet = true;
|
|
|
|
|
stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pEntry->rspTs != -1) {
|
|
|
|
|
numOfRsp += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
return numOfRsp;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
|
|
|
|
|
return (pEntry->rspTs == -1) && (now - pEntry->sendTs) > 30 * 1000;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
|
|
|
|
|
int32_t numOfFailed = 0;
|
|
|
|
|
taosThreadMutexLock(&pMsgInfo->lock);
|
|
|
|
|
|
|
|
|
|
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
|
|
|
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
|
|
|
|
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
|
|
|
|
|
numOfFailed += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
|
|
|
return numOfFailed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
int32_t vgId = pTask->pMeta->vgId;
|
|
|
|
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
|
|
|
|
int32_t msgId = pMsgInfo->msgId;
|
|
|
|
|
int64_t now = taosGetTimestampMs();
|
|
|
|
|
int32_t totalRsp = 0;
|
|
|
|
|
|
|
|
|
|
// follower not handle the dispatch rsp
|
|
|
|
|
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
|
|
|
@ -1109,53 +1223,61 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|
|
|
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
|
|
|
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
|
|
|
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
|
|
|
|
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
|
|
|
|
} else {
|
|
|
|
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
|
|
|
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else { // code == 0
|
|
|
|
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
|
|
|
|
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
|
|
|
|
// block the input of current task, to push pressure to upstream
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id);
|
|
|
|
|
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
|
|
|
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
|
|
|
|
} else {
|
|
|
|
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
|
|
|
|
|
// todo handle the role-changed during checkpoint generation, add test case
|
|
|
|
|
stError(
|
|
|
|
|
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
|
|
|
|
|
"restart already, treat it as success",
|
|
|
|
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
|
|
|
|
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
|
|
|
|
|
// todo handle the agg task failure, add test case
|
|
|
|
|
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER &&
|
|
|
|
|
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
|
|
|
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
|
|
|
|
|
", set the current checkpoint failed, and send rsp to mnode",
|
|
|
|
|
id, pTask->chkInfo.pActiveInfo->activeId);
|
|
|
|
|
{ // send checkpoint failure msg to mnode directly
|
|
|
|
|
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id
|
|
|
|
|
pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId;
|
|
|
|
|
streamTaskSendCheckpointSourceRsp(pTask);
|
|
|
|
|
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
|
|
|
|
if (delayDispatch) {
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
// we only set the dispatch msg info for current checkpoint trans
|
|
|
|
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
|
|
|
|
|
pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
|
|
|
|
|
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
|
|
|
|
|
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
|
|
|
|
|
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
|
|
|
|
|
|
|
|
|
|
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
|
|
|
|
|
} else {
|
|
|
|
|
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
|
|
|
|
|
" transId:%d discard, since expired",
|
|
|
|
|
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
|
|
|
|
|
}
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
|
|
|
|
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t leftRsp = 0;
|
|
|
|
|
int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp;
|
|
|
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
|
|
|
leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
|
|
|
|
|
ASSERT(leftRsp >= 0);
|
|
|
|
|
|
|
|
|
|
if (leftRsp > 0) {
|
|
|
|
|
if (notRsp > 0) {
|
|
|
|
|
stDebug(
|
|
|
|
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
|
|
|
|
|
"for %d rsp",
|
|
|
|
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
|
|
|
|
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
|
|
|
|
|
} else {
|
|
|
|
|
stDebug(
|
|
|
|
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
|
|
|
|
@ -1166,31 +1288,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|
|
|
|
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT(leftRsp >= 0);
|
|
|
|
|
|
|
|
|
|
// all msg rsp already, continue
|
|
|
|
|
if (leftRsp == 0) {
|
|
|
|
|
if (notRsp == 0) {
|
|
|
|
|
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
|
|
|
|
|
|
|
|
|
// we need to re-try send dispatch msg to downstream tasks
|
|
|
|
|
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);
|
|
|
|
|
if (numOfFailed > 0) {
|
|
|
|
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
|
|
|
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed);
|
|
|
|
|
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
|
|
|
stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
|
|
|
|
|
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
|
|
|
|
|
|
|
|
|
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
|
|
|
|
} else { // this message has been sent successfully, let's try next one.
|
|
|
|
|
pTask->msgInfo.retryCount = 0;
|
|
|
|
|
|
|
|
|
|
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
|
|
|
|
|
if (numOfFailed == 0) { // this message has been sent successfully, let's try next one.
|
|
|
|
|
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
|
|
|
|
|
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
|
|
|
|
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId);
|
|
|
|
|
if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
|
|
|
|
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state",
|
|
|
|
|
id, msgId);
|
|
|
|
|
ASSERT(pTask->info.fillHistory == 1);
|
|
|
|
|
|
|
|
|
|
code = streamTransferStatePrepare(pTask);
|
|
|
|
@ -1312,4 +1420,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|