Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2

This commit is contained in:
liuyao 2023-08-18 15:33:37 +08:00
commit 9afd5155ed
6 changed files with 49 additions and 5 deletions

View File

@ -826,6 +826,25 @@ TEST(clientCase, projection_query_tables) {
}
taos_free_result(pRes);
int64_t start = 1685959190000;
int32_t code = -1;
for(int32_t i = 0; i < 1000000; ++i) {
char t[512] = {0};
sprintf(t, "insert into t1 values(%ld, %ld)", start + i, i);
while(1) {
void* p = taos_query(pConn, t);
code = taos_errno(p);
taos_free_result(p);
if (code != 0) {
printf("insert data error, retry\n");
} else {
break;
}
}
}
for (int32_t i = 0; i < 1; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);

View File

@ -918,6 +918,7 @@ void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -41,8 +41,9 @@ extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
const char* streamGetBlockTypeStr(int32_t type);
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);

View File

@ -397,8 +397,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
}
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s level:%d checkpoint(trigger)/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB",
pTask->id.idStr, pTask->info.taskLevel, total, size);
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB",
pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem);

View File

@ -244,3 +244,16 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pBlock);
}
}
const char* streamGetBlockTypeStr(int32_t type) {
switch (type) {
case STREAM_INPUT__CHECKPOINT:
return "checkpoint";
case STREAM_INPUT__CHECKPOINT_TRIGGER:
return "checkpoint-triggre";
case STREAM_INPUT__TRANS_STATE:
return "trans-state";
default:
return "";
}
}

View File

@ -221,7 +221,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
qItem->type == STREAM_INPUT__TRANS_STATE) {
if (*pInput == NULL) {
qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
char* p = NULL;
if (qItem->type == STREAM_INPUT__CHECKPOINT) {
p = "checkpoint";
} else if (qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
p = "checkpoint-trigger";
} else {
p = "transtate";
}
qDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;