refactor(stream): refactor send msg.
This commit is contained in:
parent
498519c94d
commit
2b16976902
|
@ -290,8 +290,11 @@ typedef struct SSTaskBasicInfo {
|
||||||
int64_t triggerParam; // in msec
|
int64_t triggerParam; // in msec
|
||||||
} SSTaskBasicInfo;
|
} SSTaskBasicInfo;
|
||||||
|
|
||||||
|
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||||
|
|
||||||
typedef struct SDispatchMsgInfo {
|
typedef struct SDispatchMsgInfo {
|
||||||
void* pData; // current dispatch data
|
SStreamDispatchReq* pData; // current dispatch data
|
||||||
|
int8_t dispatchMsgType;
|
||||||
int16_t msgType; // dispatch msg type
|
int16_t msgType; // dispatch msg type
|
||||||
int32_t retryCount; // retry send data count
|
int32_t retryCount; // retry send data count
|
||||||
int64_t blockingTs; // output blocking timestamp
|
int64_t blockingTs; // output blocking timestamp
|
||||||
|
@ -327,6 +330,7 @@ typedef struct {
|
||||||
int64_t step2Start;
|
int64_t step2Start;
|
||||||
int64_t start;
|
int64_t start;
|
||||||
int32_t updateCount;
|
int32_t updateCount;
|
||||||
|
int32_t dispatchCount;
|
||||||
int64_t latestUpdateTs;
|
int64_t latestUpdateTs;
|
||||||
} STaskExecStatisInfo;
|
} STaskExecStatisInfo;
|
||||||
|
|
||||||
|
@ -442,7 +446,7 @@ typedef struct {
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SStreamTaskRunReq;
|
} SStreamTaskRunReq;
|
||||||
|
|
||||||
typedef struct {
|
struct SStreamDispatchReq {
|
||||||
int32_t type;
|
int32_t type;
|
||||||
int64_t stage; // nodeId from upstream task
|
int64_t stage; // nodeId from upstream task
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -455,7 +459,7 @@ typedef struct {
|
||||||
int64_t totalLen;
|
int64_t totalLen;
|
||||||
SArray* dataLen; // SArray<int32_t>
|
SArray* dataLen; // SArray<int32_t>
|
||||||
SArray* data; // SArray<SRetrieveTableRsp*>
|
SArray* data; // SArray<SRetrieveTableRsp*>
|
||||||
} SStreamDispatchReq;
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
|
|
@ -910,12 +910,12 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
|
||||||
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||||
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||||
") from task:0x%x (vgId:%d), rsp status %d",
|
") from task:0x%x (vgId:%d), rsp check_status %d",
|
||||||
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,8 @@ extern int32_t streamBackendCfWrapperId;
|
||||||
|
|
||||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
|
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||||
|
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||||
|
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||||
|
|
|
@ -277,59 +277,66 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
|
||||||
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||||
|
taosArrayDestroyP(pReq[i].data, taosMemoryFree);
|
||||||
|
taosArrayDestroy(pReq[i].dataLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
|
||||||
|
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
|
||||||
|
? 1
|
||||||
|
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||||
ASSERT(numOfBlocks != 0);
|
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
|
||||||
|
|
||||||
|
pTask->msgInfo.dispatchMsgType = pData->type;
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
SStreamDispatchReq req = {0};
|
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
||||||
|
|
||||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||||
|
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq);
|
||||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroyP(req.data, taosMemoryFree);
|
destroyDispatchMsg(pReq, 1);
|
||||||
taosArrayDestroy(req.dataLen);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
pTask->msgInfo.pData = pReq;
|
||||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
|
||||||
|
|
||||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
|
|
||||||
pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
|
||||||
|
|
||||||
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
|
|
||||||
taosArrayDestroyP(req.data, taosMemoryFree);
|
|
||||||
taosArrayDestroy(req.dataLen);
|
|
||||||
return code;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
||||||
ASSERT(rspCnt == 0);
|
ASSERT(rspCnt == 0);
|
||||||
|
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t vgSz = taosArrayGetSize(vgInfo);
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
|
||||||
if (pReqs == NULL) {
|
if (pReqs == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
|
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,52 +345,95 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p
|
||||||
|
|
||||||
// TODO: do not use broadcast
|
// TODO: do not use broadcast
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
|
||||||
for (int32_t j = 0; j < vgSz; j++) {
|
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||||
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
if (code != 0) {
|
||||||
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReqs[j].blockNum == 0) {
|
if (pReqs[j].blockNum == 0) {
|
||||||
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReqs[j].blockNum++;
|
pReqs[j].blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
|
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
if(code != 0) {
|
||||||
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
|
pTask->msgInfo.pData = pReqs;
|
||||||
pTask->info.selfChildId, numOfBlocks, vgSz);
|
// *pDispatchReq = pReqs;
|
||||||
|
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroup(s), msgId:%d", pTask->id.idStr,
|
||||||
if (pReqs[i].blockNum > 0) {
|
// pTask->info.selfChildId, numOfBlocks, numOfVgroups, msgId);
|
||||||
|
//
|
||||||
|
// for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||||
|
// if (pReqs[i].blockNum > 0) {
|
||||||
|
// SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
||||||
|
// pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
|
||||||
|
//
|
||||||
|
// code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
// if (code < 0) {
|
||||||
|
// destroyDispatchMsg(pReqs, numOfVgroups);
|
||||||
|
// return code;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
|
||||||
|
// code = 0;
|
||||||
|
//
|
||||||
|
// *pDispatchReq = pReqs;
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t msgId = pTask->taskExecInfo.dispatchCount;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
|
||||||
|
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
|
||||||
|
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
||||||
|
|
||||||
|
code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet);
|
||||||
|
} else {
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
|
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||||
|
id, pTask->info.selfChildId, numOfVgroups, msgId);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||||
|
if (pDispatchMsg[i].blockNum > 0) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
||||||
pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
|
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
|
||||||
|
|
||||||
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
|
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes", pTask->id.idStr, vgSz);
|
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
|
||||||
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
FAIL_SHUFFLE_DISPATCH:
|
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
|
||||||
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
|
|
||||||
taosArrayDestroy(pReqs[i].dataLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pReqs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -400,7 +450,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
|
|
||||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
int32_t code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (!streamTaskShouldStop(&pTask->status)) {
|
if (!streamTaskShouldStop(&pTask->status)) {
|
||||||
stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||||
|
@ -524,25 +574,31 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->msgInfo.pData = pBlock;
|
|
||||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
pBlock->type == STREAM_INPUT__TRANS_STATE);
|
pBlock->type == STREAM_INPUT__TRANS_STATE);
|
||||||
|
|
||||||
int32_t retryCount = 0;
|
int32_t retryCount = 0;
|
||||||
|
pTask->taskExecInfo.dispatchCount += 1;
|
||||||
|
|
||||||
|
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
||||||
|
if (code == 0) {
|
||||||
|
destroyStreamDataBlock(pBlock);
|
||||||
|
} else { // todo handle build dispatch msg failed
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t code = doDispatchAllBlocks(pTask, pBlock);
|
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", id,
|
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
|
||||||
tstrerror(terrno), pTask->outputInfo.status, retryCount);
|
pTask->taskExecInfo.dispatchCount, tstrerror(terrno), pTask->outputInfo.status, retryCount);
|
||||||
|
|
||||||
// todo deal with only partially success dispatch case
|
// todo deal with only partially success dispatch case
|
||||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
pTask->msgInfo.pData = NULL;
|
pTask->msgInfo.pData = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -552,6 +608,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||||
|
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -951,7 +1008,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
|
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
pTask->msgInfo.pData = NULL;
|
pTask->msgInfo.pData = NULL;
|
||||||
|
|
||||||
if (pTask->msgInfo.blockingTs != 0) {
|
if (pTask->msgInfo.blockingTs != 0) {
|
||||||
|
@ -974,6 +1031,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t msgId = pTask->taskExecInfo.dispatchCount;
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// dispatch message failed: network error, or node not available.
|
// dispatch message failed: network error, or node not available.
|
||||||
|
@ -982,14 +1040,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// happened too fast.
|
// happened too fast.
|
||||||
// todo handle the shuffle dispatch failure
|
// todo handle the shuffle dispatch failure
|
||||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id,
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||||
|
|
||||||
SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
|
// SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
|
||||||
if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
// if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
|
// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
// streamProcessCheckpointReadyMsg(pTask);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
|
// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -1002,11 +1060,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id,
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, retry", id, msgId,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||||
|
SStreamDispatchReq* pDispatchMsg = pTask->msgInfo.pData;
|
||||||
|
|
||||||
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
for(int32_t i = 0; i < numOfVgroups; ++i) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (pVgInfo->vgId == pRsp->downstreamNodeId) {
|
||||||
|
stDebug("s-task:%s (child taskId:%d) re-send blocks:%d to vgId:%d", pTask->id.idStr,
|
||||||
|
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
|
||||||
|
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1017,22 +1087,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
if (leftRsp > 0) {
|
if (leftRsp > 0) {
|
||||||
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, pRsp->downstreamTaskId,
|
stDebug(
|
||||||
pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d "
|
||||||
|
"rsp",
|
||||||
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
|
stDebug("s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s recv fix-dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d", id,
|
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||||
SStreamDataBlock* p = pTask->msgInfo.pData;
|
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
||||||
if (p->type == STREAM_INPUT__TRANS_STATE) {
|
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
|
||||||
stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
|
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
|
@ -1066,7 +1137,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
|
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
|
||||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
|
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // pipeline send data in output queue
|
} else {
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -358,8 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||||
if (pTask->msgInfo.pData != NULL) {
|
if (pTask->msgInfo.pData != NULL) {
|
||||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
pTask->msgInfo.pData = NULL;
|
pTask->msgInfo.pData = NULL;
|
||||||
|
pTask->msgInfo.dispatchMsgType = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->id.idStr != NULL) {
|
if (pTask->id.idStr != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue