Merge pull request #29964 from taosdata/fix/chkptq

refactor(stream): handle the checkpoint msg from mnode with highest priority
This commit is contained in:
Simon Guan 2025-02-28 17:21:31 +08:00 committed by GitHub
commit d9e9213483
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 191 additions and 48 deletions

View File

@ -19,7 +19,7 @@
// message process // message process
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader);
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -137,6 +137,7 @@ enum {
STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__SUCESS = 1,
STREAM_QUEUE__FAILED, STREAM_QUEUE__FAILED,
STREAM_QUEUE__PROCESSING, STREAM_QUEUE__PROCESSING,
STREAM_QUEUE__CHKPTFAILED,
}; };
typedef enum EStreamTaskEvent { typedef enum EStreamTaskEvent {

View File

@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
case TDMT_VND_STREAM_TASK_RESET: case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_PAUSE: case TDMT_STREAM_TASK_PAUSE:

View File

@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
} }
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -87,6 +87,8 @@ static void doStartScanWal(void* param, void* tmrId) {
tmr_h pTimer = NULL; tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
if (pMeta == NULL) { if (pMeta == NULL) {
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
@ -131,7 +133,7 @@ static void doStartScanWal(void* param, void* tmrId) {
} }
if (pMeta->startInfo.startAllTasks) { if (pMeta->startInfo.startAllTasks) {
tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId); tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
goto _end; goto _end;
} }
@ -154,7 +156,7 @@ static void doStartScanWal(void* param, void* tmrId) {
goto _end; goto _end;
} }
tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
#if 0 #if 0
// wait for the vnode is freed, and invalid read may occur. // wait for the vnode is freed, and invalid read may occur.
@ -317,9 +319,13 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
return false; return false;
} }
// check if input queue is full or not // check whether input queue is full or not
if (streamQueueIsFull(pTask->inputq.queue)) { if (streamQueueIsFull(pTask->inputq.queue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
int32_t code = streamTrySchedExec(pTask);
if (code) {
tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
}
return false; return false;
} }

View File

@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
} }
// this is to process request from transaction, always return true. // this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
if (restored) { if (restored && isLeader) {
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
pMeta->startInfo.tasksWillRestart = 1; pMeta->startInfo.tasksWillRestart = 1;
} }
if (updateTasks < numOfTasks) { if (updateTasks < numOfTasks) {
if (isLeader) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks)); updateTasks, (numOfTasks - updateTasks));
} else {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
(numOfTasks - updateTasks));
}
} else { } else {
if ((code = streamMetaCommit(pMeta)) < 0) { if ((code = streamMetaCommit(pMeta)) < 0) {
// always return true // always return true
@ -316,6 +321,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaClearSetUpdateTaskListComplete(pMeta); streamMetaClearSetUpdateTaskListComplete(pMeta);
if (isLeader) {
if (!restored) { if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
} else { } else {
@ -328,6 +334,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
} }
} }
} else {
tqDebug("vgId:%d follower nodes not restart tasks", vgId);
}
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);

View File

@ -144,6 +144,8 @@ struct SStreamQueue {
STaosQall* qall; STaosQall* qall;
void* qItem; void* qItem;
int8_t status; int8_t status;
STaosQueue* pChkptQueue;
void* qChkptItem;
}; };
struct SStreamQueueItem { struct SStreamQueueItem {

View File

@ -1098,6 +1098,8 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
pActiveInfo = pTask->chkInfo.pActiveInfo; pActiveInfo = pTask->chkInfo.pActiveInfo;
pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
stDebug("s-task:%s acquire task, refId:%" PRId64, id, taskRefId);
// check the status every 100ms // check the status every 100ms
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
streamCleanBeforeQuitTmr(pTmrInfo, param); streamCleanBeforeQuitTmr(pTmrInfo, param);

View File

@ -915,8 +915,35 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
} }
} }
static bool shouldNotCont(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
SStreamQueue* pQueue = pTask->inputq.queue;
ETaskStatus status = streamTaskGetStatus(pTask).state;
// 1. task should jump out
bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
// 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
// 3. no data in ordinary queue
bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
if (quit) {
return true;
} else {
if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
// in checkpoint procedure, we only check whether the controller queue is empty or not
return emptyCkQueue;
} else { // otherwise, if the block queue is empty, not continue.
return emptyBlockQueue && emptyCkQueue;
}
}
}
int32_t streamResumeTask(SStreamTask* pTask) { int32_t streamResumeTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t level = pTask->info.taskLevel;
int32_t code = 0; int32_t code = 0;
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
@ -929,11 +956,10 @@ int32_t streamResumeTask(SStreamTask* pTask) {
if (code) { if (code) {
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
} }
// check if continue
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); if (shouldNotCont(pTask)) {
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamTaskClearSchedIdleInfo(pTask); streamTaskClearSchedIdleInfo(pTask);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);

View File

@ -32,11 +32,12 @@ typedef struct SQueueReader {
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskPutbackToken(STokenBucket* pBucket);
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
static void streamQueueCleanup(SStreamQueue* pQueue) { static void streamQueueCleanup(SStreamQueue* pQueue) {
SStreamQueueItem* qItem = NULL; SStreamQueueItem* qItem = NULL;
while (1) { while (1) {
streamQueueNextItem(pQueue, &qItem); streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL);
if (qItem == NULL) { if (qItem == NULL) {
break; break;
} }
@ -47,7 +48,9 @@ static void streamQueueCleanup(SStreamQueue* pQueue) {
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
*pQ = NULL; *pQ = NULL;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
if (pQueue == NULL) { if (pQueue == NULL) {
@ -55,24 +58,26 @@ int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
} }
code = taosOpenQueue(&pQueue->pQueue); code = taosOpenQueue(&pQueue->pQueue);
if (code) { TSDB_CHECK_CODE(code, lino, _error);
taosMemoryFreeClear(pQueue);
return code;
}
code = taosAllocateQall(&pQueue->qall); code = taosAllocateQall(&pQueue->qall);
if (code) { TSDB_CHECK_CODE(code, lino, _error);
taosCloseQueue(pQueue->pQueue);
taosMemoryFree(pQueue); code = taosOpenQueue(&pQueue->pChkptQueue);
return code; TSDB_CHECK_CODE(code, lino, _error);
}
pQueue->status = STREAM_QUEUE__SUCESS; pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->pQueue, cap); taosSetQueueCapacity(pQueue->pQueue, cap);
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
*pQ = pQueue; *pQ = pQueue;
return code; return code;
_error:
streamQueueClose(pQueue, 0);
stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
return code;
} }
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
@ -82,6 +87,11 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
taosFreeQall(pQueue->qall); taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->pQueue); taosCloseQueue(pQueue->pQueue);
pQueue->pQueue = NULL;
taosCloseQueue(pQueue->pChkptQueue);
pQueue->pChkptQueue = NULL;
taosMemoryFree(pQueue); taosMemoryFree(pQueue);
} }
@ -94,6 +104,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
} else { } else {
pQueue->qItem = NULL; pQueue->qItem = NULL;
(void) taosGetQitem(pQueue->qall, &pQueue->qItem); (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
if (pQueue->qItem == NULL) { if (pQueue->qItem == NULL) {
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall); (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
(void) taosGetQitem(pQueue->qall, &pQueue->qItem); (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
@ -103,6 +114,56 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
} }
} }
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
*pItem = NULL;
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
if (flag == STREAM_QUEUE__CHKPTFAILED) {
*pItem = pQueue->qChkptItem;
return;
}
if (flag == STREAM_QUEUE__FAILED) {
*pItem = pQueue->qItem;
return;
}
pQueue->qChkptItem = NULL;
taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
if (pQueue->qChkptItem != NULL) {
stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
*pItem = pQueue->qChkptItem;
return;
}
// if in checkpoint status, not read data from ordinary input q.
if (status == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
return;
}
// let's try the ordinary input q
pQueue->qItem = NULL;
int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
if (code) {
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
}
if (pQueue->qItem == NULL) {
code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
if (code) {
stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code));
}
code = taosGetQitem(pQueue->qall, &pQueue->qItem);
if (code) {
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
}
}
*pItem = streamQueueCurItem(pQueue);
}
void streamQueueProcessSuccess(SStreamQueue* queue) { void streamQueueProcessSuccess(SStreamQueue* queue) {
if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) { if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING); stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
@ -110,6 +171,7 @@ void streamQueueProcessSuccess(SStreamQueue* queue) {
} }
queue->qItem = NULL; queue->qItem = NULL;
queue->qChkptItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
} }
@ -121,6 +183,14 @@ void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
return;
}
atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
}
bool streamQueueIsFull(const SStreamQueue* pQueue) { bool streamQueueIsFull(const SStreamQueue* pQueue) {
int32_t numOfItems = streamQueueGetNumOfItems(pQueue); int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
@ -177,6 +247,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
int32_t* blockSize) { int32_t* blockSize) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel; int32_t taskLevel = pTask->info.taskLevel;
SStreamQueue* pQueue = pTask->inputq.queue;
*pInput = NULL; *pInput = NULL;
*numOfBlocks = 0; *numOfBlocks = 0;
@ -189,13 +260,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
} }
while (1) { while (1) {
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { ETaskStatus status = streamTaskGetStatus(pTask).state;
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
return EXEC_CONTINUE; return EXEC_CONTINUE;
} }
SStreamQueueItem* qItem = NULL; SStreamQueueItem* qItem = NULL;
streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem); if (taskLevel == TASK_LEVEL__SOURCE) {
streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
} else {
streamQueueNextItem(pQueue, &qItem);
}
if (qItem == NULL) { if (qItem == NULL) {
// restore the token to bucket // restore the token to bucket
if (*numOfBlocks > 0) { if (*numOfBlocks > 0) {
@ -225,14 +302,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
*numOfBlocks = 1; *numOfBlocks = 1;
*pInput = qItem; *pInput = qItem;
return EXEC_CONTINUE; return EXEC_CONTINUE;
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block } else { // previous existed blocks needs to be handled, before handle the checkpoint msg block
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
*blockSize = streamQueueItemGetSize(*pInput); *blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) { if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
} }
streamQueueProcessFail(pTask->inputq.queue); if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
(taskLevel == TASK_LEVEL__SOURCE)) {
streamQueueGetSourceChkptFailed(pQueue);
} else {
streamQueueProcessFail(pQueue);
}
return EXEC_CONTINUE; return EXEC_CONTINUE;
} }
} else { } else {
@ -252,7 +334,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
} }
streamQueueProcessFail(pTask->inputq.queue); streamQueueProcessFail(pQueue);
return EXEC_CONTINUE; return EXEC_CONTINUE;
} }
@ -260,7 +342,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
} }
*numOfBlocks += 1; *numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputq.queue); streamQueueProcessSuccess(pQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
@ -279,6 +361,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type; int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputq.queue->pQueue; STaosQueue* pQueue = pTask->inputq.queue->pQueue;
int32_t level = pTask->info.taskLevel;
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1; int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
@ -326,7 +409,19 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
int32_t code = taosWriteQitem(pQueue, pItem);
int32_t code = 0;
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
code = taosWriteQitem(pChkptQ, pItem);
double size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
int32_t num = taosQueueItemSize(pChkptQ);
stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
} else {
code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamFreeQitem(pItem); streamFreeQitem(pItem);
return code; return code;
@ -335,6 +430,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
}
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);