enh(stream): generate the checkpoint framework.

This commit is contained in:
Haojun Liao 2023-07-07 16:07:52 +08:00
parent 61f3585714
commit 47877898d0
16 changed files with 669 additions and 519 deletions

View File

@ -48,6 +48,7 @@ enum {
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE,
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK_READY,
};
enum {
@ -310,9 +311,8 @@ struct SStreamTask {
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
SArray* pRpcMsgList; // SArray<SRpcMsg*>
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
@ -343,6 +343,7 @@ struct SStreamTask {
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
int32_t checkpointNotReadyTasks;
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
};
@ -366,7 +367,8 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
int64_t checkpointTs;
// uint64_t checkpointId;
int32_t notCkptReadyTasks;
SArray* checkpointSaved;
SArray* checkpointInUse;
int32_t checkpointCap;
@ -486,8 +488,8 @@ typedef struct {
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
typedef struct {
SMsgHead msgHead;
@ -509,13 +511,13 @@ typedef struct {
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointRsp;
} SStreamTaskCheckpointRsp;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp);
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp);
typedef struct {
int64_t streamId;
@ -536,12 +538,6 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskScanHistoryReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskScanHistoryReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
@ -557,7 +553,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
@ -627,7 +622,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);

View File

@ -80,6 +80,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -750,7 +750,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -220,6 +220,8 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);

View File

@ -217,55 +217,6 @@ void tqNotifyClose(STQ* pTq) {
}
}
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
// int64_t consumerId, int32_t type) {
// int32_t len = 0;
// int32_t code = 0;
//
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
// }
//
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// return -1;
// }
//
// ((SMqRspHead*)buf)->mqMsgType = type;
// ((SMqRspHead*)buf)->epoch = epoch;
// ((SMqRspHead*)buf)->consumerId = consumerId;
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
//
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeMqDataRsp(&encoder, pRsp);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
// }
//
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = *pRpcHandleInfo,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
// return 0;
//}
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId;
@ -948,6 +899,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
// todo extract method
SEncoder encoder;
int32_t code;
int32_t len;
@ -973,7 +925,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
@ -982,7 +934,6 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
@ -1295,26 +1246,27 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
return 0;
} else {
if (pTask == NULL) {
tqError("vgId:%d failed to found s-task, taskId:0x%x", vgId, taskId);
return -1;
}
// even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
return 0;
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
@ -1554,16 +1506,23 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code));
goto FAIL;
}
tDecoderClear(&decoder);
// todo handle this bug: task not in ready state.
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.taskId);
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId);
goto FAIL;
}
// backup the rpchandle for rsp
SRpcMsg* pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg));
taosArrayPush(pTask->pRpcMsgList, &pRpcMsg);
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;
@ -1571,3 +1530,70 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs
FAIL:
return code;
}
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskCheckpointReq req= {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
goto FAIL;
}
streamProcessCheckpointReq(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;
FAIL:
return code;
}
// downstream task has complete the stream task checkpoint procedure
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
// if this task is an agg task, rsp this message to upstream directly.
// if this task is an source task, send source rsp to mnode
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskCheckpointRsp req= {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
goto FAIL;
}
streamProcessCheckpointRsp(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;
FAIL:
return code;
}

View File

@ -503,6 +503,17 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECKPOINT: {
if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECKPOINT_RSP: {
if (tqProcessStreamCheckPointRsp(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -47,7 +47,7 @@ typedef struct {
void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendDoCheckpoint(void* pMeta, const char* path);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);

View File

@ -32,11 +32,13 @@ typedef struct {
} SStreamGlobalEnv;
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
@ -49,15 +51,12 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
#ifdef __cplusplus
}
#endif

View File

@ -146,7 +146,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;

View File

@ -401,20 +401,29 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
taosArrayDestroy(checkpointDel);
return 0;
}
int32_t streamBackendDoCheckpoint(void* arg, const char* path) {
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid;
int64_t checkpointId = pMeta->checkpointTs;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
char path[256] = {0};
sprintf(path, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(path, 0755);
if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
return code;
}
char checkpointDir[256] = {0};
sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId);
snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId);
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL) {
return -1;
}
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path);
if (pHandle->db != NULL) {
char* err = NULL;
@ -442,10 +451,12 @@ int32_t streamBackendDoCheckpoint(void* arg, const char* path) {
taosWUnLockLatch(&pMeta->checkpointDirLock);
delObsoleteCheckpoint(arg, path);
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
return code;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)backend;
SListNode* node = NULL;

View File

@ -37,7 +37,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
return 0;
}
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
@ -48,7 +48,7 @@ int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheck
return pEncoder->pos;
}
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) {
int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
@ -59,7 +59,7 @@ int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointS
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
@ -72,7 +72,7 @@ int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskChec
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
@ -85,7 +85,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
return 0;
}
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
@ -98,7 +98,7 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRs
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
@ -112,13 +112,14 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
if (pTask->checkpointingId == 0) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int64_t old = atomic_val_compare_exchange_64(&pTask->checkpointingId, 0, num);
if (old == 0) {
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
pTask->checkpointingId = checkpointId;
pTask->checkpointAlignCnt = taosArrayGetSize(pTask->pUpstreamEpInfoList);
}
ASSERT(pTask->checkpointingId == checkpointId);
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
}
@ -159,77 +160,49 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i);
streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("s-task:%s (vgId:%d) sink task set to be ready for checkpointing", pTask->id.idStr, pTask->info.nodeId);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SINK);
streamTaskLaunchScanHistory(pTask);
}
return 0;
}
// set status check pointing
// do checkpoint
static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, uint64_t checkpointId) {
int code = 0;
char buf[256] = {0};
int64_t ts = taosGetTimestampMs();
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(buf, 0755);
if (code != 0) {
qError("failed to prepare checkpoint %s, checkpointId:%" PRIu64 ", reason:%s", buf, checkpointId, tstrerror(code));
return code;
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) {
SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMeta->checkpointTs = ts;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 1. set task status to be prepared for check point
pTask->status.taskStatus = TASK_STATUS__CK;
// 2. put the checkpoint data block into the inputQ, to enable the local status to be flushed to storage backend
{
SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pChkpoint->type = STREAM_INPUT__CHECKPOINT;
pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pChkpoint->pBlock == NULL) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
pChkpoint->pBlock->info.type = STREAM_CHECKPOINT;
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
streamSchedExec(pTask);
pChkpoint->type = STREAM_INPUT__CHECKPOINT;
pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pChkpoint->pBlock == NULL) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
// 2. dispatch checkpoint msg to downstream task
streamTaskDispatchCheckpointMsg(pTask, checkpointId);
pChkpoint->pBlock->info.type = STREAM_CHECKPOINT;
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
// code = streamBackendDoCheckpoint((void*)pMeta, buf);
return code;
streamSchedExec(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
int32_t code = 0;
int64_t checkpointId = pReq->checkpointId;
code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId);
if (code < 0) {
// rsp error
return -1;
}
qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode", pTask->id.idStr,
pTask->info.taskLevel, checkpointId);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
return 0;
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointNotReadyTasks = 1;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else.
streamTaskDispatchCheckpointMsg(pTask, checkpointId);
return code;
}
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) {
@ -237,31 +210,55 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre
int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId;
if (taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0) {
code = streamAlignCheckpoint(pTask, checkpointId, childId);
if (code > 0) {
// set the task status
pTask->status.taskStatus = TASK_STATUS__CK;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task set to checkpoint ready", pTask->id.idStr);
appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask);
} else {
// todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ
// anymore
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0);
// there are still some upstream tasks not send checkpoint request
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
if (notReady > 0) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num);
return 0;
}
if (code < 0) {
ASSERT(0);
return -1;
}
}
// code = streamDoCheckpoint(pMeta, pTask, checkpointId);
if (code < 0) {
// rsp error
return -1;
}
qDebug("s-task:%s all upstream send checkpoint msg now, dispatch checkpoint msg to downstream", pTask->id.idStr);
pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
// send rsp to all children
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// 2. dispatch check point msg to all downstream tasks
streamTaskDispatchCheckpointMsg(pTask, checkpointId);
}
return 0;
}
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp) {
// recover step2, scan from wal
// unref wal
// set status normal
/**
* All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
if (notReady == 0) {
qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for this task",
pTask->id.idStr);
appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask);
}
return 0;
}

View File

@ -15,7 +15,7 @@
#include "streamInt.h"
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
if (pData == NULL) {
return NULL;

View File

@ -14,8 +14,8 @@
*/
#include "streamInt.h"
#include "ttimer.h"
#include "trpc.h"
#include "ttimer.h"
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
@ -26,57 +26,19 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
static void doRetryDispatchData(void* param, void* tmrId);
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int32_t vgSz, int64_t groupId);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
pRetrieve->watermark = htobe64(pBlock->info.watermark);
memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0;
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType;
pMsg->pCont = pCont;
pMsg->contLen = contLen;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
@ -103,6 +65,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
taosArrayPush(pReq->dataLen, &len1);
taosArrayPush(pReq->data, &data);
}
tEndDecode(pDecoder);
return 0;
}
@ -220,7 +183,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
tEncodeStreamRetrieveReq(&encoder, &req);
tEncoderClear(&encoder);
SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0);
goto CLEAR;
@ -263,13 +228,9 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
rpcFreeCont(buf);
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK;
initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
@ -277,161 +238,6 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
pReq->taskId, vgId);
return 0;
}
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
if (code < 0) {
goto FAIL;
}
code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = pTask->msgInfo.msgType;
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
return tmsgSendReq(pEpSet, &msg);
FAIL:
if (buf) {
rpcFreeCont(buf);
}
return code;
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) {
uint32_t hashValue = 0;
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
}
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
if (pVal) {
SBlockName* pBln = (SBlockName*)pVal;
hashValue = pBln->hashValue;
if (!pDataBlock->info.parTbName[0]) {
memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
}
} else {
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (ctbName == NULL) {
return -1;
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1;
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
}
ASSERT(found);
return 0;
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
@ -494,7 +300,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
@ -514,14 +319,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
}
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId,
numOfBlocks, vgSz);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
@ -544,20 +349,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return code;
}
static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
}
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
}
@ -604,15 +397,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
// todo deal with only partially success dispatch case
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return code;
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr,
retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break;
}
@ -622,13 +415,14 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamTaskCheckpointReq, pReq, tlen, code);
tEncodeSize(tEncodeStreamCheckpointReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
@ -643,21 +437,325 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpo
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, pReq)) < 0) {
if ((code = tEncodeStreamCheckpointReq(&encoder, pReq)) < 0) {
rpcFreeCont(buf);
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECKPOINT;
initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
return 0;
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId};
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.taskId = pTask->fixedEpDispatcher.taskId;
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
return 0;
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) {
// int32_t code = 0;
// int32_t len;
//
// // todo set upstreamTaskId Info
// const SStreamTaskCheckpointRsp rsp = {
// .streamId = pTask->id.streamId, .downstreamTaskId = pTask->id.taskId, .downstreamNodeId = vgId};
//
// SEncoder encoder;
// tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code);
// if (code < 0) {
// qError("vgId:%d failed to encode checkpoint rsp, task:0x%x", vgId, rsp.downstreamTaskId);
// return -1;
// }
//
// void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
// ((SMsgHead*)buf)->vgId = htonl(rsp.upstreamNodeId);
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
// tEncoderInit(&encoder, (uint8_t*)abuf, len);
// tEncodeStreamCheckpointRsp(&encoder, &rsp);
// tEncoderClear(&encoder);
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num);
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel,
num);
for(int32_t i = 0; i < num; ++i) {
SRpcMsg* pMsg = taosArrayGetP(pTask->pRpcMsgList, 0);
tmsgSendRsp(pMsg);
}
return TSDB_CODE_SUCCESS;
}
// this function is only invoked by source task, and send rsp to mnode
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// int32_t code = 0;
// int32_t len;
// SEncoder encoder;
// SStreamMeta* pMeta = pTask->pMeta;
// const SStreamCheckpointSourceRsp rsp = {
// .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pMeta->checkpointId};
//
// tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
// if (code < 0) {
// qError("vgId:%d failed to encode source checkpoint rsp, task:0x%x", vgId, pTask->id.taskId);
// return -1;
// }
//
// void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
// ((SMsgHead*)buf)->vgId = htonl(rsp.nodeId);
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
// tEncoderInit(&encoder, (uint8_t*)abuf, len);
// tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
// tEncoderClear(&encoder);
// SRpcMsg rspMsg = *pTask->rpcMsg;
ASSERT(taosArrayGetSize(pTask->pRpcMsgList) == 0);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
tmsgSendRsp(taosArrayGetP(pTask->pRpcMsgList, 0));
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
pRetrieve->watermark = htobe64(pBlock->info.watermark);
memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0;
}
int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
msg.info.noResp = 1;
initRpcMsg(&msg,TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
pReq->taskId, vgId);
return 0;
}
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
if (code < 0) {
goto FAIL;
}
code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
initRpcMsg(&msg, pTask->msgInfo.msgType, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
return tmsgSendReq(pEpSet, &msg);
FAIL:
if (buf) {
rpcFreeCont(buf);
}
return code;
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) {
uint32_t hashValue = 0;
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
}
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
if (pVal) {
SBlockName* pBln = (SBlockName*)pVal;
hashValue = pBln->hashValue;
if (!pDataBlock->info.parTbName[0]) {
memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
}
} else {
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (ctbName == NULL) {
return -1;
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1;
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
}
ASSERT(found);
return 0;
}
void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
}

View File

@ -16,8 +16,6 @@
#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo(SStreamTask* pTask);
@ -399,61 +397,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
*pInput = newRet;
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
}
}
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
@ -468,21 +411,28 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
/*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize, id);
if (pInput == NULL) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
}
break;
// no data in the inputQ, return now
return 0;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue;
if (pInput->type == STREAM_INPUT__DATA_BLOCK) {
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}
} else {
ASSERT(pInput->type == STREAM_INPUT__CHECKPOINT);
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0;
}
int64_t st = taosGetTimestampMs();
@ -519,6 +469,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else {
ASSERT(0);
}
@ -533,9 +486,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
id, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
return 0;
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (pInput->type == STREAM_INPUT__CHECKPOINT) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0;
}
}
}
bool streamTaskIsIdle(const SStreamTask* pTask) {
@ -574,9 +532,27 @@ int32_t streamTryExec(SStreamTask* pTask) {
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
(!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
// check for all tasks, and do generate the vnode-wide checkpoint data.
// todo extract method
SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->notCkptReadyTasks, 1);
if (remain <= 0) { // all tasks are in TASK_STATUS__CK_READY state
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
}
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask, pMeta->vgId);
} else {
streamTaskSendCheckpointRsp(pTask, pMeta->vgId);
}
} else {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
(!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}
}

View File

@ -107,8 +107,8 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif
#define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
// todo refactor:
// read data from input queue
@ -119,6 +119,7 @@ typedef struct SQueueReader {
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
#if 0
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
int32_t numOfBlocks = 0;
int32_t tryCount = 0;
@ -164,3 +165,58 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
return pRet;
}
#endif
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node and check point data block
if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || (qItem->type == STREAM_INPUT__CHECKPOINT)) {
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
*pInput = newRet;
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
}
}
}

View File

@ -286,29 +286,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.taskId = pTask->fixedEpDispatcher.taskId;
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
return 0;
}
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;