refactor(stream): refactor send msg.

This commit is contained in:
Haojun Liao 2023-09-22 14:52:18 +08:00
parent 7380151322
commit 898aea5cfb
5 changed files with 157 additions and 79 deletions

View File

@ -290,8 +290,11 @@ typedef struct SSTaskBasicInfo {
int64_t triggerParam; // in msec
} SSTaskBasicInfo;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct SDispatchMsgInfo {
void* pData; // current dispatch data
SStreamDispatchReq* pData; // current dispatch data
int8_t dispatchMsgType;
int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count
int64_t blockingTs; // output blocking timestamp
@ -327,6 +330,7 @@ typedef struct {
int64_t step2Start;
int64_t start;
int32_t updateCount;
int32_t dispatchCount;
int64_t latestUpdateTs;
} STaskExecStatisInfo;
@ -442,7 +446,7 @@ typedef struct {
int32_t taskId;
} SStreamTaskRunReq;
typedef struct {
struct SStreamDispatchReq {
int32_t type;
int64_t stage; // nodeId from upstream task
int64_t streamId;
@ -455,7 +459,7 @@ typedef struct {
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
} SStreamDispatchReq;
};
typedef struct {
int64_t streamId;

View File

@ -897,12 +897,12 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}

View File

@ -59,6 +59,8 @@ extern int32_t streamBackendCfWrapperId;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);

View File

@ -277,59 +277,66 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
for (int32_t i = 0; i < numOfVgroups; i++) {
taosArrayDestroyP(pReq[i].data, taosMemoryFree);
taosArrayDestroy(pReq[i].dataLen);
}
taosMemoryFree(pReq);
}
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0);
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
pTask->msgInfo.dispatchMsgType = pData->type;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {0};
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
destroyDispatchMsg(pReq, 1);
return code;
}
}
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
pTask->msgInfo.pData = pReq;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < vgSz; i++) {
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL_SHUFFLE_DISPATCH;
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
}
@ -338,52 +345,95 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
for (int32_t j = 0; j < numOfVgroups; j++) {
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
if (code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
}
continue;
}
if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
if(code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
}
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, vgSz);
pTask->msgInfo.pData = pReqs;
// *pDispatchReq = pReqs;
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroup(s), msgId:%d", pTask->id.idStr,
// pTask->info.selfChildId, numOfBlocks, numOfVgroups, msgId);
//
// for (int32_t i = 0; i < numOfVgroups; i++) {
// if (pReqs[i].blockNum > 0) {
// SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
// pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
//
// code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
// if (code < 0) {
// destroyDispatchMsg(pReqs, numOfVgroups);
// return code;
// }
// }
// }
//
// stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
// code = 0;
//
// *pDispatchReq = pReqs;
}
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount);
return code;
}
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
int32_t code = 0;
int32_t msgId = pTask->taskExecInfo.dispatchCount;
const char* id = pTask->id.idStr;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet);
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
id, pTask->info.selfChildId, numOfVgroups, msgId);
for (int32_t i = 0; i < numOfVgroups; i++) {
if (pDispatchMsg[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
goto FAIL_SHUFFLE_DISPATCH;
break;
}
}
}
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes", pTask->id.idStr, vgSz);
code = 0;
FAIL_SHUFFLE_DISPATCH:
for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen);
}
taosMemoryFree(pReqs);
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
}
return code;
@ -400,7 +450,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
int32_t code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(&pTask->status)) {
stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
@ -524,25 +574,31 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
pTask->msgInfo.pData = pBlock;
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
pBlock->type == STREAM_INPUT__TRANS_STATE);
int32_t retryCount = 0;
pTask->taskExecInfo.dispatchCount += 1;
int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
destroyStreamDataBlock(pBlock);
} else { // todo handle build dispatch msg failed
}
while (1) {
int32_t code = doDispatchAllBlocks(pTask, pBlock);
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
if (code == TSDB_CODE_SUCCESS) {
break;
}
stDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", id,
tstrerror(terrno), pTask->outputInfo.status, retryCount);
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
pTask->taskExecInfo.dispatchCount, tstrerror(terrno), pTask->outputInfo.status, retryCount);
// todo deal with only partially success dispatch case
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
destroyStreamDataBlock(pTask->msgInfo.pData);
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
return code;
}
@ -552,6 +608,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break;
}
@ -951,7 +1008,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
destroyStreamDataBlock(pTask->msgInfo.pData);
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
@ -974,6 +1031,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr;
int32_t msgId = pTask->taskExecInfo.dispatchCount;
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
@ -982,14 +1040,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// happened too fast.
// todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), no retry, since it is destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
streamProcessCheckpointReadyMsg(pTask);
}
// SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
// if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
// streamProcessCheckpointReadyMsg(pTask);
// }
// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -1002,11 +1060,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
}
} else {
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), code:%s, retry cnt:%d", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code), ++pTask->msgInfo.retryCount);
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, retry", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
SStreamDispatchReq* pDispatchMsg = pTask->msgInfo.pData;
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
for(int32_t i = 0; i < numOfVgroups; ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->vgId == pRsp->downstreamNodeId) {
stDebug("s-task:%s (child taskId:%d) re-send blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
}
}
} else {
sendDispatchMsg(pTask, pTask->msgInfo.pData);
}
}
@ -1017,22 +1087,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
if (leftRsp > 0) {
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, pRsp->downstreamTaskId,
pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d "
"rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
return 0;
} else {
stDebug("s-task:%s recv dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
stDebug("s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
}
} else {
stDebug("s-task:%s recv fix-dispatch rsp from 0x%x(vgId:%d), downstream task input status:%d code:%d", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
SStreamDataBlock* p = pTask->msgInfo.pData;
if (p->type == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
@ -1066,7 +1137,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
} else {
// this message has been sent successfully, let's try next one.
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}

View File

@ -358,8 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
pTask->msgInfo.dispatchMsgType = 0;
}
if (pTask->id.idStr != NULL) {