enh(stream): add checkpoint queue for source tasks.
This commit is contained in:
parent
2e24c8aaa9
commit
67853b4a54
|
@ -136,6 +136,7 @@ enum {
|
|||
enum {
|
||||
STREAM_QUEUE__SUCESS = 1,
|
||||
STREAM_QUEUE__FAILED,
|
||||
STREAM_QUEUE__CHKPTFAILED,
|
||||
STREAM_QUEUE__PROCESSING,
|
||||
};
|
||||
|
||||
|
|
|
@ -144,6 +144,8 @@ struct SStreamQueue {
|
|||
STaosQall* qall;
|
||||
void* qItem;
|
||||
int8_t status;
|
||||
STaosQueue* pChkptQueue;
|
||||
void* qChkptItem;
|
||||
};
|
||||
|
||||
struct SStreamQueueItem {
|
||||
|
|
|
@ -47,7 +47,9 @@ static void streamQueueCleanup(SStreamQueue* pQueue) {
|
|||
|
||||
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
|
||||
*pQ = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
||||
if (pQueue == NULL) {
|
||||
|
@ -55,24 +57,26 @@ int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
|
|||
}
|
||||
|
||||
code = taosOpenQueue(&pQueue->pQueue);
|
||||
if (code) {
|
||||
taosMemoryFreeClear(pQueue);
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = taosAllocateQall(&pQueue->qall);
|
||||
if (code) {
|
||||
taosCloseQueue(pQueue->pQueue);
|
||||
taosMemoryFree(pQueue);
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = taosOpenQueue(&pQueue->pChkptQueue);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pQueue->status = STREAM_QUEUE__SUCESS;
|
||||
|
||||
taosSetQueueCapacity(pQueue->pQueue, cap);
|
||||
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
|
||||
|
||||
*pQ = pQueue;
|
||||
return code;
|
||||
|
||||
_error:
|
||||
streamQueueClose(pQueue, 0);
|
||||
stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||
|
@ -82,6 +86,11 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
|||
|
||||
taosFreeQall(pQueue->qall);
|
||||
taosCloseQueue(pQueue->pQueue);
|
||||
pQueue->pQueue = NULL;
|
||||
|
||||
taosCloseQueue(pQueue->pChkptQueue);
|
||||
pQueue->pChkptQueue = NULL;
|
||||
|
||||
taosMemoryFree(pQueue);
|
||||
}
|
||||
|
||||
|
@ -94,6 +103,46 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
|
|||
} else {
|
||||
pQueue->qItem = NULL;
|
||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
|
||||
if (pQueue->qItem == NULL) {
|
||||
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
}
|
||||
|
||||
*pItem = streamQueueCurItem(pQueue);
|
||||
}
|
||||
}
|
||||
|
||||
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status) {
|
||||
*pItem = NULL;
|
||||
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
|
||||
|
||||
if (flag == STREAM_QUEUE__CHKPTFAILED) {
|
||||
*pItem = pQueue->qChkptItem;
|
||||
} else {
|
||||
pQueue->qChkptItem = NULL;
|
||||
taosReadQitem(pQueue->pChkptQueue, (void**) &pQueue->qChkptItem);
|
||||
if (pQueue->qChkptItem != NULL) {
|
||||
stDebug("read data from checkpoint queue, status:%d", status);
|
||||
|
||||
*pItem = pQueue->qChkptItem;
|
||||
return;
|
||||
}
|
||||
|
||||
// if in checkpoint status, not read data from ordinary input q.
|
||||
if (status == TASK_STATUS__CK) {
|
||||
stDebug("in checkpoint status, not ready data in normal queue");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// let's try the ordinary input q
|
||||
if (flag == STREAM_QUEUE__FAILED) {
|
||||
*pItem = streamQueueCurItem(pQueue);
|
||||
} else {
|
||||
pQueue->qItem = NULL;
|
||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
|
||||
if (pQueue->qItem == NULL) {
|
||||
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
|
@ -110,6 +159,7 @@ void streamQueueProcessSuccess(SStreamQueue* queue) {
|
|||
}
|
||||
|
||||
queue->qItem = NULL;
|
||||
queue->qChkptItem = NULL;
|
||||
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
|
||||
}
|
||||
|
||||
|
@ -121,6 +171,14 @@ void streamQueueProcessFail(SStreamQueue* queue) {
|
|||
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
||||
}
|
||||
|
||||
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
|
||||
if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
|
||||
stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
|
||||
return;
|
||||
}
|
||||
atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
|
||||
}
|
||||
|
||||
bool streamQueueIsFull(const SStreamQueue* pQueue) {
|
||||
int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
|
||||
if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
|
||||
|
@ -175,8 +233,9 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
|
||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
SStreamQueue* pQueue = pTask->inputq.queue;
|
||||
|
||||
*pInput = NULL;
|
||||
*numOfBlocks = 0;
|
||||
|
@ -189,13 +248,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
}
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) {
|
||||
stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = NULL;
|
||||
streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem);
|
||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamQueueNextItemInSourceQ(pQueue, &qItem, status);
|
||||
} else {
|
||||
streamQueueNextItem(pQueue, &qItem);
|
||||
}
|
||||
|
||||
if (qItem == NULL) {
|
||||
// restore the token to bucket
|
||||
if (*numOfBlocks > 0) {
|
||||
|
@ -225,14 +290,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
*numOfBlocks = 1;
|
||||
*pInput = qItem;
|
||||
return EXEC_CONTINUE;
|
||||
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
|
||||
} else { // previous existed blocks needs to be handled, before handle the checkpoint msg block
|
||||
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
|
||||
(taskLevel == TASK_LEVEL__SOURCE)) {
|
||||
streamQueueGetSourceChkptFailed(pQueue);
|
||||
} else {
|
||||
streamQueueProcessFail(pQueue);
|
||||
}
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
} else {
|
||||
|
@ -252,7 +322,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
streamQueueProcessFail(pQueue);
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
||||
|
@ -260,7 +330,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
}
|
||||
|
||||
*numOfBlocks += 1;
|
||||
streamQueueProcessSuccess(pTask->inputq.queue);
|
||||
streamQueueProcessSuccess(pQueue);
|
||||
|
||||
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||
|
@ -279,6 +349,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||
int8_t type = pItem->type;
|
||||
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
|
||||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
|
@ -326,15 +397,28 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamFreeQitem(pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
|
||||
int32_t code = 0;
|
||||
if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && level == TASK_LEVEL__SOURCE) {
|
||||
STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
|
||||
code = taosWriteQitem(pChkptQ, pItem);
|
||||
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
|
||||
int32_t num = taosQueueItemSize(pChkptQ);
|
||||
|
||||
stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
|
||||
} else {
|
||||
code = taosWriteQitem(pQueue, pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamFreeQitem(pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
|
||||
}
|
||||
} else if (type == STREAM_INPUT__GET_RES) {
|
||||
// use the default memory limit, refactor later.
|
||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||
|
|
Loading…
Reference in New Issue