fix(stream): add dispatch data monitor to handle the network broken problem that may cause the stream process frozen.

This commit is contained in:
Haojun Liao 2024-06-13 14:56:20 +08:00
parent 6f32a617f7
commit 0f8c0fa8cb
7 changed files with 384 additions and 270 deletions

View File

@ -205,7 +205,6 @@ typedef struct {
typedef struct { typedef struct {
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
int32_t waitingRspCnt;
SUseDbRsp dbInfo; SUseDbRsp dbInfo;
} STaskDispatcherShuffle; } STaskDispatcherShuffle;
@ -312,15 +311,18 @@ typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo { typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data SStreamDispatchReq* pData; // current dispatch data
int8_t dispatchMsgType; int8_t dispatchMsgType;
int64_t checkpointId; // checkpoint id msg int64_t checkpointId; // checkpoint id msg
int32_t transId; // transId for current checkpoint int32_t transId; // transId for current checkpoint
int16_t msgType; // dispatch msg type int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count int32_t msgId;
int64_t startTs; // dispatch start time, record total elapsed time for dispatch int64_t startTs; // dispatch start time, record total elapsed time for dispatch
SArray* pRetryList; // current dispatch successfully completed node of downstream int64_t rspTs; // latest rsp time
void* pRetryTmr; // used to dispatch data after a given time duration void* pRetryTmr; // used to dispatch data after a given time duration
void* pRspTmr; // used to dispatch data after a given time duration TdThreadMutex lock;
int8_t inMonitor;
SArray* pSendInfo; // SArray<SDispatchEntry>
} SDispatchMsgInfo; } SDispatchMsgInfo;
typedef struct STaskQueue { typedef struct STaskQueue {

View File

@ -1012,16 +1012,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
// if (!pTq->pVnode->restored) {
// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64
// ", transId:%d s-task:0x%x ignore it",
// vgId, pReq->checkpointId, pReq->transId, pReq->taskId);
// return TSDB_CODE_SUCCESS;
// }
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
} }

View File

@ -362,6 +362,7 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId); pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
@ -369,6 +370,9 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
pRsp->stage = htobe64(pRsp->stage); pRsp->stage = htobe64(pRsp->stage);
pRsp->msgId = htonl(pRsp->msgId); pRsp->msgId = htonl(pRsp->msgId);
tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);

View File

@ -31,15 +31,11 @@ extern "C" {
#define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 #define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
#define MAX_BLOCK_NAME_NUM 1024 #define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300 #define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
#define META_HB_CHECK_INTERVAL 200 #define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
@ -118,6 +114,14 @@ typedef struct {
int32_t taskId; int32_t taskId;
} STaskTriggerSendInfo; } STaskTriggerSendInfo;
typedef struct {
int32_t nodeId;
int32_t status;
int64_t sendTs;
int64_t rspTs;
int32_t retryCount;
} SDispatchEntry;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int64_t recvTs; int64_t recvTs;
@ -143,6 +147,12 @@ typedef enum {
EXEC_AFTER_IDLE = 0x1, EXEC_AFTER_IDLE = 0x1,
} EExtractDataCode; } EExtractDataCode;
typedef enum ECHECKPOINT_BACKUP_TYPE {
DATA_UPLOAD_DISABLE = -1,
DATA_UPLOAD_S3 = 0,
DATA_UPLOAD_RSYNC = 1,
} ECHECKPOINT_BACKUP_TYPE;
extern void* streamTimer; extern void* streamTimer;
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
@ -153,10 +163,9 @@ void streamTimerCleanUp();
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
void clearBufferedDispatchMsg(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
@ -204,12 +213,6 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key); void streamMetaRemoveDB(void* arg, char* key);
typedef enum ECHECKPOINT_BACKUP_TYPE {
DATA_UPLOAD_DISABLE = -1,
DATA_UPLOAD_S3 = 0,
DATA_UPLOAD_RSYNC = 1,
} ECHECKPOINT_BACKUP_TYPE;
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskDownloadCheckpointData(const char* id, char* path);

View File

@ -23,13 +23,15 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName; } SBlockName;
static void doRetryDispatchData(void* param, void* tmrId); static void doMonitorDispatchData(void* param, void* tmrId);
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int32_t vgSz, int64_t groupId); int64_t groupId, int64_t now);
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type); int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType; pMsg->msgType = msgType;
@ -42,7 +44,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->srcVgId = vgId; pReq->srcVgId = vgId;
pReq->stage = pTask->pMeta->stage; pReq->stage = pTask->pMeta->stage;
pReq->msgId = pTask->execInfo.dispatch; pReq->msgId = pTask->msgInfo.msgId;
pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId; pReq->upstreamNodeId = pTask->info.nodeId;
@ -65,6 +67,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->streamId = pReq->streamId; pCont->streamId = pReq->streamId;
pCont->rspToTaskId = pReq->srcTaskId; pCont->rspToTaskId = pReq->srcTaskId;
@ -216,26 +219,66 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
}
void clearBufferedDispatchMsg(SStreamTask* pTask) { void clearBufferedDispatchMsg(SStreamTask* pTask) {
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
if (pMsgInfo->pData != NULL) { if (pMsgInfo->pData != NULL) {
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
} }
pMsgInfo->checkpointId = -1; pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1; pMsgInfo->transId = -1;
pMsgInfo->pData = NULL; pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0; pMsgInfo->dispatchMsgType = 0;
taosThreadMutexLock(&pMsgInfo->lock);
taosArrayClear(pTask->msgInfo.pSendInfo);
taosThreadMutexUnlock(&pMsgInfo->lock);
}
static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
int32_t num = streamTaskGetNumOfDownstream(pTask);
ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH);
SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, numOfVgroups);
terrno = code;
return NULL;
}
}
} else {
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReqs, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReqs);
terrno = code;
return NULL;
}
}
return pReqs;
} }
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0; int32_t code = 0;
int64_t now = taosGetTimestampMs();
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
@ -247,48 +290,28 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.transId = p->info.window.ekey; pTask->msgInfo.transId = p->info.window.ekey;
} }
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReqs = createDispatchDataReq(pTask, pData);
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); if (pReqs == NULL) {
stError("s-task:%s failed to create dispatch req", pTask->id.idStr);
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; return terrno;
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
return code;
} }
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReq, 1); destroyDispatchMsg(pReqs, 1);
return code; return code;
} }
} }
pTask->msgInfo.pData = pReq; pTask->msgInfo.pData = pReqs;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
}
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
@ -304,7 +327,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
// it's a new vnode to receive dispatch msg, so add one // it's a new vnode to receive dispatch msg, so add one
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
taosThreadMutexLock(&pTask->msgInfo.lock);
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
taosThreadMutexUnlock(&pTask->msgInfo.lock);
} }
pReqs[j].blockNum++; pReqs[j].blockNum++;
@ -313,7 +341,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
continue; continue;
} }
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId); code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, pDataBlock->info.id.groupId, now);
if (code != 0) { if (code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups); destroyDispatchMsg(pReqs, numOfVgroups);
return code; return code;
@ -327,9 +355,9 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr, stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData); pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData);
} else { } else {
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr, stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt, pTask->execInfo.dispatch, pTask->pMeta->stage, numOfBranches, pTask->msgInfo.pData);
pTask->msgInfo.pData);
} }
return code; return code;
@ -337,8 +365,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
int32_t code = 0; int32_t code = 0;
int32_t msgId = pTask->execInfo.dispatch;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t msgId = pTask->msgInfo.msgId;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
@ -352,10 +380,10 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
} else { } else {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt;
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id, stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId); pTask->info.selfChildId, numOfBranches, numOfVgroups, msgId);
int32_t numOfSend = 0; int32_t numOfSend = 0;
for (int32_t i = 0; i < numOfVgroups; i++) { for (int32_t i = 0; i < numOfVgroups; i++) {
@ -370,7 +398,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
} }
// no need to try remain, all already send. // no need to try remain, all already send.
if (++numOfSend == actualVgroups) { if (++numOfSend == numOfBranches) {
break; break;
} }
} }
@ -382,102 +410,154 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
return code; return code;
} }
static void doRetryDispatchData(void* param, void* tmrId) { static void setNotInDispatchMonitor(SDispatchMsgInfo* pMsgInfo) {
SStreamTask* pTask = param; taosThreadMutexLock(&pMsgInfo->lock);
const char* id = pTask->id.idStr; pMsgInfo->inMonitor = 0;
int32_t msgId = pTask->execInfo.dispatch; taosThreadMutexUnlock(&pMsgInfo->lock);
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
} }
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
pEntry->sendTs = now;
int32_t code = 0; pEntry->rspTs = -1;
pEntry->retryCount += 1;
{ }
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
taosArrayClear(pTask->msgInfo.pRetryList);
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
SStreamDispatchReq* pReq = pTask->msgInfo.pData; SStreamDispatchReq* pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t msgId = pTask->msgInfo.msgId;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfFailed = taosArrayGetSize(pList); setResendInfo(pEntry, now);
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, numOfFailed, msgId);
for (int32_t i = 0; i < numOfFailed; i++) {
int32_t vgId = *(int32_t*)taosArrayGet(pList, i);
for (int32_t j = 0; j < numOfVgroups; ++j) { for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo->vgId == vgId) { if (pVgInfo->vgId == pEntry->nodeId) {
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId); stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code));
code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
break; break;
} }
} }
} }
static void doMonitorDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int32_t code = 0;
int64_t now = taosGetTimestampMs();
stDebug("s-task:%s start monitor dispatch data", id);
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
setNotInDispatchMonitor(pMsgInfo);
return;
}
// slave task not handle the dispatch, downstream not ready will break the monitor timer
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref);
setNotInDispatchMonitor(pMsgInfo);
return;
}
taosThreadMutexLock(&pMsgInfo->lock);
if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
pTask->msgInfo.inMonitor = 0;
taosThreadMutexUnlock(&pMsgInfo->lock);
return;
}
taosThreadMutexUnlock(&pMsgInfo->lock);
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
if (numOfFailed == 0) {
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
return;
}
{
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
pTask->info.selfChildId, msgId);
int32_t numOfRetry = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
continue;
}
// downstream not rsp yet beyond threshold that is 10s
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
doSendFailedDispatch(pTask, pEntry, now, "timeout");
numOfRetry += 1;
continue;
}
// downstream inputQ is closed
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
numOfRetry += 1;
continue;
}
// handle other errors
if (pEntry->status != TSDB_CODE_SUCCESS) {
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
numOfRetry += 1;
}
} }
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
numOfFailed, msgId); numOfRetry, msgId);
} else { } else {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1);
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); setResendInfo(pEntry, now);
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
}
} }
taosArrayDestroy(pList); if (streamTaskShouldStop(pTask)) {
}
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(pTask)) {
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(pTask)) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
} setNotInDispatchMonitor(pMsgInfo);
} else { } else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
} }
} }
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
pTask->msgInfo.retryCount++;
stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pRetryTmr != NULL) { if (pTask->msgInfo.pRetryTmr != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); taosTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
} else { } else {
pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
} }
} }
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int64_t groupId) { int64_t groupId, int64_t now) {
uint32_t hashValue = 0; uint32_t hashValue = 0;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) { if (pTask->pNameMap == NULL) {
@ -495,10 +575,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else { } else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->subtableWithoutMd5 != 1 && if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) &&
!isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) &&
groupId != 0){
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
@ -508,10 +586,13 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else { } else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
} }
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName);
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
SBlockName bln = {0}; SBlockName bln = {0};
bln.hashValue = hashValue; bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
@ -521,19 +602,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} }
bool found = false; bool found = false;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
// TODO: optimize search // TODO: optimize search
int32_t j; taosThreadMutexLock(&pTask->msgInfo.lock);
for (j = 0; j < vgSz; j++) {
for (int32_t j = 0; j < numOfVgroups; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
taosThreadMutexUnlock(&pTask->msgInfo.lock);
return -1; return -1;
} }
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
} }
pReqs[j].blockNum++; pReqs[j].blockNum++;
@ -541,10 +627,28 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
break; break;
} }
} }
taosThreadMutexUnlock(&pTask->msgInfo.lock);
ASSERT(found); ASSERT(found);
return 0; return 0;
} }
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) {
pInfo->rspTs = recvTs;
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
@ -587,7 +691,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
type == STREAM_INPUT__TRANS_STATE); type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1; pTask->execInfo.dispatch += 1;
pTask->msgInfo.startTs = taosGetTimestampMs(); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
int32_t code = doBuildDispatchMsg(pTask, pBlock); int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) { if (code == 0) {
@ -599,33 +703,20 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
streamTaskInitTriggerDispatchInfo(pTask); streamTaskInitTriggerDispatchInfo(pTask);
} }
int32_t retryCount = 0;
while (1) {
code = sendDispatchMsg(pTask, pTask->msgInfo.pData); code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
if (code == TSDB_CODE_SUCCESS) {
break;
}
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, taosThreadMutexLock(&pTask->msgInfo.lock);
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount); if (pTask->msgInfo.inMonitor == 0) {
// todo deal with only partially success dispatch case
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
clearBufferedDispatchMsg(pTask);
return code;
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref,
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", tstrerror(code));
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
pTask->msgInfo.inMonitor = 1;
} else {
stDebug("s-task:%s already in dispatch monitor tmr", id);
}
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); taosThreadMutexUnlock(&pTask->msgInfo.lock);
break;
}
}
// this block can not be deleted until it has been sent to downstream task successfully. // this block can not be deleted until it has been sent to downstream task successfully.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -818,7 +909,9 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
ASSERT(dataStrLen > 0); ASSERT(dataStrLen > 0);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) {
return -1;
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0; pRetrieve->useconds = 0;
@ -1031,23 +1124,6 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
}
taosThreadMutexUnlock(&pTask->lock);
}
clearBufferedDispatchMsg(pTask); clearBufferedDispatchMsg(pTask);
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
@ -1074,17 +1150,55 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
return 0; return 0;
} }
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
int32_t numOfRsp = 0;
bool alreadySet = false;
taosThreadMutexLock(&pMsgInfo->lock);
for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
if (pEntry->nodeId == vgId) {
ASSERT(!alreadySet);
pEntry->rspTs = now;
pEntry->status = code;
alreadySet = true;
stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
}
if (pEntry->rspTs != -1) {
numOfRsp += 1;
}
}
taosThreadMutexUnlock(&pMsgInfo->lock);
return numOfRsp;
}
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
return (pEntry->rspTs == -1) && (now - pEntry->sendTs) > 30 * 1000;
}
int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
int32_t numOfFailed = 0;
taosThreadMutexLock(&pMsgInfo->lock);
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
numOfFailed += 1;
}
}
taosThreadMutexUnlock(&pMsgInfo->lock);
return numOfFailed;
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
#if 0 int64_t now = taosGetTimestampMs();
// for test purpose, build the failure case int32_t totalRsp = 0;
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) {
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED;
}
#endif
// follower not handle the dispatch rsp // follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
@ -1109,53 +1223,61 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
} else { } else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
taosThreadMutexLock(&pTask->lock); totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
taosThreadMutexUnlock(&pTask->lock);
} }
} else { // code == 0 } else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream // block the input of current task, to push pressure to upstream
taosThreadMutexLock(&pTask->lock); totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
taosThreadMutexUnlock(&pTask->lock);
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the agg task failure, add test case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER &&
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
", set the current checkpoint failed, and send rsp to mnode",
id, pTask->chkInfo.pActiveInfo->activeId);
{ // send checkpoint failure msg to mnode directly
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id
pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId;
streamTaskSendCheckpointSourceRsp(pTask);
}
} else {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else {
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the role-changed during checkpoint generation, add test case
stError(
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
"restart already, treat it as success",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
{
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
" transId:%d discard, since expired",
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
}
taosThreadMutexUnlock(&pTask->lock);
}
} }
} }
} }
int32_t leftRsp = 0; int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); if (notRsp > 0) {
ASSERT(leftRsp >= 0);
if (leftRsp > 0) {
stDebug( stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting " "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
"for %d rsp", "for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp); id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
} else { } else {
stDebug( stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp", "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
@ -1166,31 +1288,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
} }
ASSERT(leftRsp >= 0);
// all msg rsp already, continue // all msg rsp already, continue
if (leftRsp == 0) { if (notRsp == 0) {
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
// we need to re-try send dispatch msg to downstream tasks // we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
if (numOfFailed > 0) { if (numOfFailed == 0) { // this message has been sent successfully, let's try next one.
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed);
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
}
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0;
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state",
id, msgId);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStatePrepare(pTask); code = streamTransferStatePrepare(pTask);
@ -1312,4 +1420,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
return 0; return 0;
} }

View File

@ -643,7 +643,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
return 0; return 0;
} }

View File

@ -283,10 +283,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->outputInfo.pTokenBucket); taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo);
taosThreadMutexDestroy(&pTask->msgInfo.lock);
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
@ -373,7 +375,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->pMeta = pMeta; pTask->pMeta = pMeta;
pTask->pMsgCb = pMsgCb; pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
if (pTask->msgInfo.pSendInfo == NULL) {
stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
return terrno;
}
taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};