fix(stream): load term from sync model to decide if the node is restart or not

This commit is contained in:
Haojun Liao 2023-08-15 15:52:06 +08:00
parent c9f3e0b0e1
commit 39079ab64a
12 changed files with 60 additions and 55 deletions

View File

@ -417,10 +417,10 @@ typedef struct {
} SStreamTaskRunReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t type;
int64_t stage; //nodeId from upstream task
int64_t streamId;
int32_t taskId;
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;

View File

@ -866,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}

View File

@ -172,6 +172,8 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
void tqUpdateNodeStage(STQ* pTq);
#ifdef __cplusplus
}
#endif

View File

@ -225,7 +225,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq);
int64_t tqGetNodeStage(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -128,8 +128,7 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
int64_t stage = tqGetNodeStage(pTq);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, stage);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1);
if (pTq->pStreamMeta == NULL) {
return -1;
}
@ -1531,7 +1530,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
return 0;
} else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
return TSDB_CODE_INVALID_MSG;
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return terrno;
}
}
@ -1930,8 +1930,3 @@ int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
tmsgSendRsp(&rsp);
return 0;
}
int64_t tqGetNodeStage(STQ* pTq) {
SSyncState state = syncGetState(pTq->pVnode->sync);
return state.term;
}

View File

@ -26,12 +26,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs();
// update the cached nodeId when reading data from WAL files.
int64_t nodeStage = tqGetNodeStage(pTq);
if (pMeta->stage != nodeStage) {
pMeta->stage = nodeStage;
}
while (1) {
int32_t scan = pMeta->walScanCounter;
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);

View File

@ -35,6 +35,11 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return 0;
}
void tqUpdateNodeStage(STQ* pTq) {
SSyncState state = syncGetState(pTq->pVnode->sync);
pTq->pStreamMeta->stage = state.term;
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;

View File

@ -412,11 +412,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
// open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
pVnode->pTq = tqOpen(tdir, pVnode);
if (pVnode->pTq == NULL) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open sma
if (smaOpen(pVnode, rollback)) {
@ -444,6 +439,12 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
goto _err;
}
pVnode->pTq = tqOpen(tdir, pVnode);
if (pVnode->pTq == NULL) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (rollback) {
vnodeRollback(pVnode);
}

View File

@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "tq.h"
#include "vnd.h"
#define BATCH_ENABLE 0
@ -588,6 +589,9 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq);
}
vDebug("vgId:%d, become leader", pVnode->config.vgId);
}

View File

@ -47,14 +47,38 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->contLen = contLen;
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
@ -581,30 +605,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
@ -827,10 +827,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // no retry
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no retry", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code));
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
}
return TSDB_CODE_SUCCESS;

View File

@ -124,7 +124,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64, vgId, chkpId);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64", stage:%" PRId64, vgId, chkpId, stage);
return pMeta;
_err:

View File

@ -212,7 +212,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == upstreamTaskId))? 1:0;
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0;
}
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {