From edbad4577213590720f94cd4cfea0721e1dfd985 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Aug 2023 13:45:08 +0800 Subject: [PATCH] other: add logs. --- source/client/test/clientTests.cpp | 19 +++++++++++++++++++ source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/inc/streamInt.h | 5 +++-- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamData.c | 13 +++++++++++++ source/libs/stream/src/streamQueue.c | 12 +++++++++++- 6 files changed, 49 insertions(+), 5 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 4e5e9e45d2..a1c8690dfc 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -826,6 +826,25 @@ TEST(clientCase, projection_query_tables) { } taos_free_result(pRes); + int64_t start = 1685959190000; + + int32_t code = -1; + for(int32_t i = 0; i < 1000000; ++i) { + char t[512] = {0}; + + sprintf(t, "insert into t1 values(%ld, %ld)", start + i, i); + while(1) { + void* p = taos_query(pConn, t); + code = taos_errno(p); + taos_free_result(p); + if (code != 0) { + printf("insert data error, retry\n"); + } else { + break; + } + } + } + for (int32_t i = 0; i < 1; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bc66f5a3ae..38453ee81a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -918,6 +918,7 @@ void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); + tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a9e6c2e625..837afad072 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -41,8 +41,9 @@ extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; -void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); -int32_t streamDispatchStreamBlock(SStreamTask* pTask); +const char* streamGetBlockTypeStr(int32_t type); +void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); +int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index df9813ad9b..1e6a389486 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -397,8 +397,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s level:%d checkpoint(trigger)/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", - pTask->id.idStr, pTask->info.taskLevel, total, size); + qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", + pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 041cf1f0cd..f2e3b8529a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -244,3 +244,16 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pBlock); } } + +const char* streamGetBlockTypeStr(int32_t type) { + switch (type) { + case STREAM_INPUT__CHECKPOINT: + return "checkpoint"; + case STREAM_INPUT__CHECKPOINT_TRIGGER: + return "checkpoint-triggre"; + case STREAM_INPUT__TRANS_STATE: + return "trans-state"; + default: + return ""; + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9a33e7a1e1..194be382ff 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -221,7 +221,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || qItem->type == STREAM_INPUT__TRANS_STATE) { if (*pInput == NULL) { - qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id); + + char* p = NULL; + if (qItem->type == STREAM_INPUT__CHECKPOINT) { + p = "checkpoint"; + } else if (qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + p = "checkpoint-trigger"; + } else { + p = "transtate"; + } + + qDebug("s-task:%s %s msg extracted, start to process immediately", id, p); *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS;