enh(stream): support dispatch checkpoint msg.
This commit is contained in:
parent
0e914a19c6
commit
d7101f7109
|
@ -47,6 +47,7 @@ enum {
|
||||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||||
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
|
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
|
||||||
TASK_STATUS__PAUSE,
|
TASK_STATUS__PAUSE,
|
||||||
|
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -118,7 +119,7 @@ typedef struct {
|
||||||
} SStreamMergedSubmit;
|
} SStreamMergedSubmit;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
||||||
int32_t srcVgId;
|
int32_t srcVgId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
|
@ -134,14 +135,10 @@ typedef struct {
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
} SStreamRefDataBlock;
|
} SStreamRefDataBlock;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t type;
|
|
||||||
} SStreamCheckpoint;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
} SStreamTrigger;
|
} SStreamTrigger, SStreamCheckpoint;
|
||||||
|
|
||||||
typedef struct SStreamQueueNode SStreamQueueNode;
|
typedef struct SStreamQueueNode SStreamQueueNode;
|
||||||
|
|
||||||
|
@ -486,8 +483,8 @@ typedef struct {
|
||||||
int64_t expireTime;
|
int64_t expireTime;
|
||||||
} SStreamCheckpointSourceRsp;
|
} SStreamCheckpointSourceRsp;
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
|
||||||
int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
||||||
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
|
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
|
||||||
|
@ -501,9 +498,7 @@ typedef struct {
|
||||||
int32_t upstreamTaskId;
|
int32_t upstreamTaskId;
|
||||||
int32_t upstreamNodeId;
|
int32_t upstreamNodeId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int64_t expireTime;
|
} SStreamTaskCheckpointReq;
|
||||||
int8_t taskLevel;
|
|
||||||
} SStreamCheckpointReq;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead msgHead;
|
SMsgHead msgHead;
|
||||||
|
@ -514,15 +509,13 @@ typedef struct {
|
||||||
int32_t upstreamTaskId;
|
int32_t upstreamTaskId;
|
||||||
int32_t upstreamNodeId;
|
int32_t upstreamNodeId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int64_t expireTime;
|
|
||||||
int8_t taskLevel;
|
|
||||||
} SStreamCheckpointRsp;
|
} SStreamCheckpointRsp;
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq);
|
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
|
||||||
int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq);
|
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
|
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
|
||||||
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
|
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -633,7 +626,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
|
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq);
|
||||||
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
|
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
|
||||||
|
|
||||||
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
||||||
|
|
|
@ -889,10 +889,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
pMsg->checkpointId = checkpointId;
|
pMsg->checkpointId = checkpointId;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
|
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
|
||||||
.pCont = pMsg,
|
|
||||||
.contLen = sizeof(SMStreamDoCheckpointMsg),
|
|
||||||
};
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -909,7 +906,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t blen;
|
int32_t blen;
|
||||||
|
|
||||||
tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code);
|
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -926,7 +923,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
tEncodeSStreamCheckpointSourceReq(&encoder, &req);
|
tEncodeStreamCheckpointSourceReq(&encoder, &req);
|
||||||
|
|
||||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||||
pMsgHead->contLen = htonl(tlen);
|
pMsgHead->contLen = htonl(tlen);
|
||||||
|
@ -950,7 +947,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t blen;
|
int32_t blen;
|
||||||
|
|
||||||
tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code);
|
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -967,7 +964,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
tEncodeSStreamCheckpointSourceReq(&encoder, &req);
|
tEncodeStreamCheckpointSourceReq(&encoder, &req);
|
||||||
|
|
||||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||||
pMsgHead->contLen = htonl(tlen);
|
pMsgHead->contLen = htonl(tlen);
|
||||||
|
|
|
@ -174,7 +174,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
|
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||||
int32_t type, int64_t sver, int64_t ever);
|
int32_t type, int64_t sver, int64_t ever);
|
||||||
|
|
|
@ -219,7 +219,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
||||||
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
|
||||||
int32_t tqCheckStreamStatus(STQ* pTq);
|
int32_t tqCheckStreamStatus(STQ* pTq);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
|
|
|
@ -1278,59 +1278,6 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
|
||||||
|
|
||||||
*pRefBlock = NULL;
|
|
||||||
|
|
||||||
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
|
|
||||||
if (pRes->uidList == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecoderInit(pCoder, (uint8_t*)pData, len);
|
|
||||||
tDecodeDeleteRes(pCoder, pRes);
|
|
||||||
tDecoderClear(pCoder);
|
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pRes->uidList);
|
|
||||||
if (numOfTables == 0 || pRes->affectedRows == 0) {
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
|
||||||
blockDataEnsureCapacity(pDelBlock, numOfTables);
|
|
||||||
pDelBlock->info.rows = numOfTables;
|
|
||||||
pDelBlock->info.version = ver;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
|
||||||
// start key column
|
|
||||||
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
|
|
||||||
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
|
|
||||||
// uid column
|
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
||||||
int64_t* pUid = taosArrayGet(pRes->uidList, i);
|
|
||||||
colDataSetVal(pUidCol, i, (const char*)pUid, false);
|
|
||||||
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
|
||||||
if (pRefBlock == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
|
||||||
(*pRefBlock)->pBlock = pDelBlock;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||||
|
|
||||||
|
@ -1365,7 +1312,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
|
tqError("vgId:%d failed to found s-task, taskId:0x%x", vgId, taskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1593,14 +1540,34 @@ FAIL:
|
||||||
|
|
||||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
|
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
|
||||||
|
|
||||||
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
|
||||||
int32_t len = msgLen - sizeof(SMsgHead);
|
int32_t len = msgLen - sizeof(SMsgHead);
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
streamDoCheckpoint(pMeta);
|
SStreamCheckpointSourceReq req= {0};
|
||||||
// taosWLockLatch(&pMeta->lock);
|
|
||||||
// taosWUnLockLatch(&pMeta->lock);
|
SDecoder decoder;
|
||||||
return 0;
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
|
||||||
|
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.taskId);
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
FAIL:
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
int32_t scan = pMeta->walScanCounter;
|
int32_t scan = pMeta->walScanCounter;
|
||||||
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
|
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
|
||||||
|
|
||||||
// check all restore tasks
|
// check all tasks
|
||||||
bool shouldIdle = true;
|
bool shouldIdle = true;
|
||||||
createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle);
|
createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
|
@ -73,6 +73,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
// broadcast the check downstream tasks msg
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pTaskList, i);
|
int32_t* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
|
||||||
|
@ -83,8 +84,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
|
||||||
streamTaskCheckDownstreamTasks(pTask);
|
streamTaskCheckDownstreamTasks(pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pTaskList);
|
|
||||||
|
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,21 +20,6 @@
|
||||||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||||
const SMqMetaRsp* pRsp, int32_t vgId);
|
const SMqMetaRsp* pRsp, int32_t vgId);
|
||||||
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
|
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
|
||||||
if (code < 0) {
|
|
||||||
tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
|
||||||
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
||||||
pRsp->reqOffset = pReq->reqOffset;
|
pRsp->reqOffset = pReq->reqOffset;
|
||||||
|
|
||||||
|
@ -415,3 +400,60 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
||||||
|
SDecoder* pCoder = &(SDecoder){0};
|
||||||
|
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||||
|
|
||||||
|
*pRefBlock = NULL;
|
||||||
|
|
||||||
|
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
|
||||||
|
if (pRes->uidList == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderInit(pCoder, (uint8_t*)pData, len);
|
||||||
|
tDecodeDeleteRes(pCoder, pRes);
|
||||||
|
tDecoderClear(pCoder);
|
||||||
|
|
||||||
|
int32_t numOfTables = taosArrayGetSize(pRes->uidList);
|
||||||
|
if (numOfTables == 0 || pRes->affectedRows == 0) {
|
||||||
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||||
|
blockDataEnsureCapacity(pDelBlock, numOfTables);
|
||||||
|
pDelBlock->info.rows = numOfTables;
|
||||||
|
pDelBlock->info.version = ver;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
|
// start key column
|
||||||
|
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
|
||||||
|
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
|
||||||
|
// uid column
|
||||||
|
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
int64_t* pUid = taosArrayGet(pRes->uidList, i);
|
||||||
|
colDataSetVal(pUidCol, i, (const char*)pUid, false);
|
||||||
|
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
|
if (pRefBlock == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
|
(*pRefBlock)->pBlock = pDelBlock;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCreateCheckpointBlock(SStreamCheckpoint** pCheckpointBlock) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -499,7 +499,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
} break;
|
} break;
|
||||||
|
|
||||||
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: {
|
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: {
|
||||||
if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pReq, len) < 0) goto _err;
|
if (tqProcessStreamCheckPointSourceReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
needCommit = pVnode->config.hashChange;
|
needCommit = pVnode->config.hashChange;
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
#include "streamBackendRocksdb.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -48,6 +48,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
|
||||||
|
|
||||||
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
|
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
|
||||||
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet);
|
SEpSet* pEpSet);
|
||||||
|
|
|
@ -385,8 +385,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
}
|
}
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||||
// use the default memory limit, refactor later.
|
} else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later.
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||||
|
@ -26,7 +26,7 @@ int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheck
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
|
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
||||||
|
@ -59,7 +59,7 @@ int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) {
|
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||||
|
@ -68,13 +68,11 @@ int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointR
|
||||||
if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pReq->taskLevel) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) {
|
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
||||||
|
@ -83,13 +81,11 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pReq->taskLevel) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
|
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
|
||||||
|
@ -98,13 +94,11 @@ int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointR
|
||||||
if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pRsp->taskLevel) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
|
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
|
||||||
|
@ -113,8 +107,6 @@ int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pR
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pRsp->taskLevel) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -130,24 +122,105 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
|
||||||
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
|
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
|
static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) {
|
||||||
// // commit tdb state
|
SStreamTaskCheckpointReq req = {
|
||||||
// streamStateCommit(pTask->pState);
|
.streamId = pTask->id.streamId,
|
||||||
// // commit non-tdb state
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
// // copy and save new state
|
.upstreamNodeId = pTask->info.nodeId,
|
||||||
// // report to mnode
|
.downstreamNodeId = pTask->info.nodeId,
|
||||||
// // send checkpoint req to downstream
|
.downstreamTaskId = pTask->id.taskId,
|
||||||
// return 0;
|
.childId = pTask->info.selfChildId,
|
||||||
// }
|
.checkpointId = checkpointId,
|
||||||
|
};
|
||||||
|
|
||||||
|
// serialize
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
|
||||||
|
qDebug("s-task:%s dispatch checkpoint msg to task:0x%x(vgId:%d)", pTask->id.idStr, req.downstreamTaskId,
|
||||||
|
req.downstreamNodeId);
|
||||||
|
|
||||||
|
streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
pTask->notReadyTasks = numOfVgs;
|
||||||
|
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
|
||||||
|
|
||||||
|
qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
qDebug("s-task:%s (vgId:%d) checkpoint to task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr,
|
||||||
|
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i);
|
||||||
|
streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s (vgId:%d) sink task set to be ready for checkpointing", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SINK);
|
||||||
|
streamTaskLaunchScanHistory(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
|
|
||||||
// ref wal
|
|
||||||
// set status checkpointing
|
|
||||||
// do checkpoint
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set status check pointing
|
||||||
|
// do checkpoint
|
||||||
|
static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, uint64_t checkpointId) {
|
||||||
|
int code = 0;
|
||||||
|
char buf[256] = {0};
|
||||||
|
|
||||||
|
int64_t ts = taosGetTimestampMs();
|
||||||
|
|
||||||
|
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
|
||||||
|
code = taosMulModeMkDir(buf, 0755);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("failed to prepare checkpoint %s, checkpointId:%" PRIu64 ", reason:%s", buf, checkpointId, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->checkpointTs = ts;
|
||||||
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
|
// 1. set task status to be prepared for check point
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
|
|
||||||
|
// 2. put the checkpoint data block into the inputQ, to enable the local status to be flushed to storage backend
|
||||||
|
{
|
||||||
|
SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock));
|
||||||
|
if (pChkpoint == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pChkpoint->type = STREAM_INPUT__CHECKPOINT;
|
||||||
|
pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
if (pChkpoint->pBlock == NULL) {
|
||||||
|
taosFreeQitem(pChkpoint);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pChkpoint->pBlock->info.type = STREAM_CHECKPOINT;
|
||||||
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
||||||
|
taosFreeQitem(pChkpoint);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamSchedExec(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. dispatch checkpoint msg to downstream task
|
||||||
|
streamTaskDispatchCheckpointMsg(pTask, checkpointId);
|
||||||
|
|
||||||
|
// code = streamBackendDoCheckpoint((void*)pMeta, buf);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
||||||
int32_t code;
|
int32_t code = 0;
|
||||||
int64_t checkpointId = pReq->checkpointId;
|
int64_t checkpointId = pReq->checkpointId;
|
||||||
|
|
||||||
code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId);
|
code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId);
|
||||||
|
@ -159,7 +232,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) {
|
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int64_t checkpointId = pReq->checkpointId;
|
int64_t checkpointId = pReq->checkpointId;
|
||||||
int32_t childId = pReq->childId;
|
int32_t childId = pReq->childId;
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
#define MAX_BLOCK_NAME_NUM 1024
|
#define MAX_BLOCK_NAME_NUM 1024
|
||||||
#define DISPATCH_RETRY_INTERVAL_MS 300
|
#define DISPATCH_RETRY_INTERVAL_MS 300
|
||||||
|
@ -620,3 +621,43 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
// this block can not be deleted until it has been sent to downstream task successfully.
|
// this block can not be deleted until it has been sent to downstream task successfully.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
SRpcMsg msg = {0};
|
||||||
|
|
||||||
|
int32_t tlen;
|
||||||
|
tEncodeSize(tEncodeStreamTaskCheckpointReq, pReq, tlen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(nodeId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, pReq)) < 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
msg.contLen = tlen + sizeof(SMsgHead);
|
||||||
|
msg.pCont = buf;
|
||||||
|
msg.msgType = TDMT_STREAM_TASK_CHECKPOINT;
|
||||||
|
|
||||||
|
qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
||||||
|
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -462,24 +462,3 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDoCheckpoint(SStreamMeta* pMeta) {
|
|
||||||
int code = -1;
|
|
||||||
char buf[256] = {0};
|
|
||||||
|
|
||||||
int64_t ts = taosGetTimestampMs();
|
|
||||||
if (ts - pMeta->checkpointTs <= tsStreamCheckpointTickInterval * 1000) {
|
|
||||||
// avoid do checkpoint freq
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
pMeta->checkpointTs = ts;
|
|
||||||
|
|
||||||
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
|
|
||||||
code = taosMulModeMkDir(buf, 0755);
|
|
||||||
if (code != 0) {
|
|
||||||
qError("failed to create chechpoint %s, reason:%s", buf, tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
code = streamBackendDoCheckpoint((void*)pMeta, buf);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
|
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
|
||||||
SStreamScanHistoryReq req;
|
SStreamScanHistoryReq req;
|
||||||
|
@ -43,6 +44,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
|
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
|
||||||
case TASK_STATUS__HALT: return "halt";
|
case TASK_STATUS__HALT: return "halt";
|
||||||
case TASK_STATUS__PAUSE: return "paused";
|
case TASK_STATUS__PAUSE: return "paused";
|
||||||
|
case TASK_STATUS__CK: return "check-point";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue