From 3fc3dafca61e65c37628a438ee0c002116a43aa2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 13:46:48 +0800 Subject: [PATCH 01/19] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 35 +++++++-------- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 45 +------------------ source/dnode/vnode/src/tq/tqRestore.c | 4 +- source/libs/stream/inc/streamInt.h | 3 -- source/libs/stream/src/stream.c | 24 +++++----- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamDispatch.c | 22 ++------- source/libs/stream/src/streamExec.c | 14 +++--- source/libs/stream/src/streamMeta.c | 6 +-- source/libs/stream/src/streamQueue.c | 54 +++++++++++------------ source/libs/stream/src/streamTask.c | 24 +++++----- 12 files changed, 86 insertions(+), 149 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48bf2451a0..929a330e8a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -179,7 +179,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); #endif typedef struct { - STaosQueue* queue; + STaosQueue* pQueue; STaosQall* qall; void* qItem; int8_t status; @@ -297,6 +297,7 @@ typedef struct SSTaskBasicInfo { int32_t totalLevel; int8_t taskLevel; int8_t fillHistory; // is fill history task or not + int64_t triggerParam; // in msec } SSTaskBasicInfo; typedef struct SDispatchMsgInfo { @@ -306,12 +307,23 @@ typedef struct SDispatchMsgInfo { int64_t blockingTs; // output blocking timestamp } SDispatchMsgInfo; -typedef struct { +typedef struct STaskOutputInfo { int8_t type; int8_t status; SStreamQueue* queue; } STaskOutputInfo; +typedef struct STaskInputInfo { + int8_t status; + SStreamQueue* queue; +} STaskInputInfo; + +typedef struct STaskSchedInfo { + int8_t status; +// int64_t triggerParam; + void* pTimer; +} STaskSchedInfo; + typedef struct { int64_t init; int64_t step1Start; @@ -323,6 +335,8 @@ struct SStreamTask { SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; + STaskInputInfo inputInfo; + STaskSchedInfo schedInfo; SDispatchMsgInfo msgInfo; SStreamStatus status; SCheckpointInfo chkInfo; @@ -330,8 +344,6 @@ struct SStreamTask { SHistDataRange dataRange; SStreamTaskId historyTaskId; SStreamTaskId streamTaskId; - int32_t nextCheckId; - SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ @@ -346,13 +358,6 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; - int8_t inputStatus; - SStreamQueue* inputQueue; - - // trigger - int8_t triggerStatus; - int64_t triggerParam; - void* schedTimer; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend @@ -596,14 +601,6 @@ typedef struct SStreamTaskNodeUpdateMsg { int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); -typedef struct SStreamTaskNodeUpdateRsp { - int64_t streamId; - int32_t taskId; -} SStreamTaskNodeUpdateRsp; - -int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg); -int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg); - typedef struct { int64_t streamId; int32_t downstreamTaskId; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 06ef01a3f1..c1a59416f6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -99,7 +99,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->triggerParam); + pTask->info.fillHistory, pTask->info.triggerParam); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 65e6ee4433..5d47baad64 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -915,7 +915,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->triggerParam); + pTask->info.fillHistory, pTask->info.triggerParam); return 0; } @@ -1504,7 +1504,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); - } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { + } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputInfo.queue->pQueue) == 0)) { tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); @@ -1832,44 +1832,3 @@ _end: return rsp.code; } -int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { - int32_t vgId = TD_VID(pTq->pVnode); - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - - SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId, - pReq->taskId); - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; - } - - tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr); - streamTaskStop(pTask); - - SStreamTask* pHistoryTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHistoryTask == NULL) { - tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", - pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); - - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; - } - - tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - - streamTaskStop(pHistoryTask); - streamMetaReleaseTask(pMeta, pHistoryTask); - } - - streamMetaReleaseTask(pMeta, pTask); - tmsgSendRsp(&rsp); - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 37763a690d..5efccc8f3c 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -340,14 +340,14 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamQueueIsFull(pTask->inputQueue->queue)) { + if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } // downstream task has blocked the output, stopped for a while - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a485e94824..bb81582a2d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -77,9 +77,6 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -extern int32_t streamBackendId; -extern int32_t streamBackendCfWrapperId; - #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3b045a8ad7..1f93498557 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -59,8 +59,8 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - int8_t status = atomic_load_8(&pTask->triggerStatus); - qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); + int8_t status = atomic_load_8(&pTask->schedInfo.status); + qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->info.triggerParam); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); @@ -80,29 +80,29 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); + atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { taosFreeQitem(pTrigger); - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } streamSchedExec(pTask); } - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) { + if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedTimer == NULL); + ASSERT(ref == 2 && pTask->schedInfo.pTimer == NULL); - qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->triggerParam); + qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); - pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; + pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } return 0; @@ -224,7 +224,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock destroyStreamDataBlock(pBlock); } else { ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); - code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code != 0) { qError("s-task:%s failed to put res into outputQ", pTask->id.idStr); } @@ -299,7 +299,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 54f8b9b697..2a0940c4d0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -155,7 +155,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; - int32_t code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 64628f8c7f..cf04bcc1b8 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -493,7 +493,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); const char* id = pTask->id.idStr; - int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue); + int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems); } @@ -995,7 +995,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); @@ -1012,7 +1012,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i pTask->msgInfo.blockingTs = 0; // put data into inputQ of current task is also allowed - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; } // now ready for next data output @@ -1062,19 +1062,3 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* tEndDecode(pDecoder); return 0; } - -int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} \ No newline at end of file diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a3cbc1fb4e..e6b112d050 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -83,7 +83,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); taosMsleep(1000); continue; @@ -192,7 +192,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { return 0; } - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosMsleep(10000); continue; @@ -249,8 +249,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { } int32_t streamTaskGetInputQItems(const SStreamTask* pTask) { - int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue); - int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall); + int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue); + int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall); return numOfItems1 + numOfItems2; } @@ -360,7 +360,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); - if (taosQueueEmpty(pStreamTask->inputQueue->queue)) { + if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -472,7 +472,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { pBlock->srcVgId = pTask->pMeta->vgId; - code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { @@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) { qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 471804b6d6..ff5e9adaee 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -261,10 +261,10 @@ void streamMetaClear(SStreamMeta* pMeta) { SStreamTask* p = *(SStreamTask**)pIter; // release the ref by timer - if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer + if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - taosTmrStop(p->schedTimer); - p->triggerParam = 0; + taosTmrStop(p->schedInfo.pTimer); + p->info.triggerParam = 0; streamMetaReleaseTask(pMeta, p); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 519551486b..7e6a438e12 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -19,7 +19,6 @@ #define MIN_STREAM_EXEC_BATCH_NUM 4 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) // todo refactor: // read data from input queue @@ -46,28 +45,28 @@ SStreamQueue* streamQueueOpen(int64_t cap) { return NULL; } - pQueue->queue = taosOpenQueue(); + pQueue->pQueue = taosOpenQueue(); pQueue->qall = taosAllocateQall(); - if (pQueue->queue == NULL || pQueue->qall == NULL) { - if (pQueue->queue) taosCloseQueue(pQueue->queue); + if (pQueue->pQueue == NULL || pQueue->qall == NULL) { + if (pQueue->pQueue) taosCloseQueue(pQueue->pQueue); if (pQueue->qall) taosFreeQall(pQueue->qall); taosMemoryFree(pQueue); return NULL; } pQueue->status = STREAM_QUEUE__SUCESS; - taosSetQueueCapacity(pQueue->queue, cap); - taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); + taosSetQueueCapacity(pQueue->pQueue, cap); + taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); return pQueue; } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { - qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue, taosQueueItemSize(pQueue->pQueue)); streamQueueCleanup(pQueue); taosFreeQall(pQueue->qall); - taosCloseQueue(pQueue->queue); + taosCloseQueue(pQueue->pQueue); taosMemoryFree(pQueue); } @@ -81,7 +80,7 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { pQueue->qItem = NULL; taosGetQitem(pQueue->qall, &pQueue->qItem); if (pQueue->qItem == NULL) { - taosReadAllQitems(pQueue->queue, pQueue->qall); + taosReadAllQitems(pQueue->pQueue, pQueue->qall); taosGetQitem(pQueue->qall, &pQueue->qItem); } @@ -149,7 +148,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { bool streamQueueIsFull(const STaosQueue* pQueue) { bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY; - double size = QUEUE_MEM_SIZE_IN_MB((STaosQueue*) pQueue); + double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue)); return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); } @@ -165,7 +164,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; @@ -185,7 +184,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(10); @@ -211,7 +210,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block qDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); - streamQueueProcessFail(pTask->inputQueue); + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } } else { @@ -227,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu tstrerror(terrno)); } - streamQueueProcessFail(pTask->inputQueue); + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -235,7 +234,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputQueue); + streamQueueProcessSuccess(pTask->inputInfo.queue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -246,13 +245,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + int8_t type = pItem->type; + STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; + int32_t total = taosQueueItemSize(pQueue) + 1; + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputQueue->queue)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { qError( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -264,7 +264,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t msgLen = px->submit.msgLen; int64_t ver = px->submit.ver; - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); taosFreeQitem(pItem); @@ -276,7 +276,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pTask->inputQueue->queue)) { + if (streamQueueIsFull(pQueue)) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); @@ -284,27 +284,27 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - taosWriteQitem(pTask->inputQueue->queue, pItem); + taosWriteQitem(pQueue, pItem); qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. - taosWriteQitem(pTask->inputQueue->queue, pItem); + taosWriteQitem(pQueue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { ASSERT(0); } - if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus); + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->info.triggerParam != 0) { + atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01dcb435c0..bc719699d1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -39,7 +39,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.streamId = streamId; pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; - pTask->triggerParam = triggerParam; + pTask->info.triggerParam = triggerParam; char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); @@ -47,7 +47,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; addToTaskset(pTaskList, pTask); @@ -133,7 +133,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } - if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -244,7 +244,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } - if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -271,9 +271,9 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMsleep(10); } - if (pTask->schedTimer != NULL) { - taosTmrStop(pTask->schedTimer); - pTask->schedTimer = NULL; + if (pTask->schedInfo.pTimer != NULL) { + taosTmrStop(pTask->schedInfo.pTimer); + pTask->schedInfo.pTimer = NULL; } if (pTask->launchTaskTimer != NULL) { @@ -282,8 +282,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); - if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue, pTask->id.taskId); + if (pTask->inputInfo.queue) { + streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId); } if (pTask->outputInfo.queue) { @@ -353,16 +353,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; - pTask->inputQueue = streamQueueOpen(512 << 10); + pTask->inputInfo.queue = streamQueueOpen(512 << 10); pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { + if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) { qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return -1; } pTask->tsInfo.init = taosGetTimestampMs(); - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; From 1709ab4849548265a9cc465c313e9a01dae9bfee Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 31 Aug 2023 13:52:37 +0800 Subject: [PATCH 02/19] fix: remove constant partition --- source/libs/parser/src/parTranslater.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 90dfc3a2fd..d2b954a11e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3749,10 +3749,6 @@ static int32_t removeConstantValueFromList(SNodeList** pList) { static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; int32_t code = TSDB_CODE_SUCCESS; - - if (pSelect->pPartitionByList) { - code = removeConstantValueFromList(&pSelect->pPartitionByList); - } if (TSDB_CODE_SUCCESS == code && pSelect->pPartitionByList) { int8_t typeType = getTableTypeFromTableNode(pSelect->pFromTable); @@ -3964,6 +3960,11 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = replaceTbName(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + if (pSelect->pPartitionByList) { + code = removeConstantValueFromList(&pSelect->pPartitionByList); + } + } return code; } From 2285b834b0782a3f25659bb4c7dd195d34a75f0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 13:53:12 +0800 Subject: [PATCH 03/19] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 17 +++-------------- source/libs/stream/src/streamCheckpoint.c | 6 ++---- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamQueue.c | 11 +++++++++++ source/libs/stream/src/streamRecover.c | 1 - 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 929a330e8a..506543e26e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,7 +50,6 @@ enum { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore - TASK_STATUS__CK_READY, }; enum { @@ -190,19 +189,9 @@ void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); - -static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); - queue->qItem = NULL; - atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); -} - -static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); - atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); -} - -void* streamQueueNextItem(SStreamQueue* pQueue); +void streamQueueProcessSuccess(SStreamQueue* queue); +void streamQueueProcessFail(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2a0940c4d0..baf319d014 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -217,7 +217,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } if (taskLevel == TASK_LEVEL__SINK) { - pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", id, num); streamFreeQitem((SStreamQueueItem*)pBlock); @@ -231,8 +230,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // can start local checkpoint procedure pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); - // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY - // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointBlock(pBlock, pTask); } @@ -314,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); - if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state + if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); pMeta->totalTasks = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e6b112d050..ff667fa778 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -577,7 +577,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { // ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); -// pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskBuildCheckpoint(pTask); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7e6a438e12..34b0a00639 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -88,6 +88,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { } } +void streamQueueProcessSuccess(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + queue->qItem = NULL; + atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); +} + +void streamQueueProcessFail(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); +} + #if 0 bool streamQueueResEmpty(const SStreamQueueRes* pRes) { // diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index fda6333516..5dd6d9087b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -65,7 +65,6 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; case TASK_STATUS__CK: return "check-point"; - case TASK_STATUS__CK_READY: return "check-point-ready"; case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__STOP: return "stop"; default:return ""; From e2bb64eb18e5b7d2e040d0e8a82d6c607aa6b711 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 14:23:46 +0800 Subject: [PATCH 04/19] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 12 ++++---- source/dnode/vnode/CMakeLists.txt | 2 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 4 +-- .../src/tq/{tqRestore.c => tqStreamTask.c} | 28 ++++++++++--------- source/libs/stream/src/streamRecover.c | 8 +++--- 6 files changed, 28 insertions(+), 28 deletions(-) rename source/dnode/vnode/src/tq/{tqRestore.c => tqStreamTask.c} (93%) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 710069582c..908b250e61 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -248,7 +248,7 @@ typedef struct SStreamChildEpInfo { typedef struct SStreamTaskKey { int64_t streamId; - int64_t taskId; + int32_t taskId; } SStreamTaskKey; typedef struct SStreamTaskId { @@ -273,10 +273,10 @@ typedef struct SStreamStatus { int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; -typedef struct SHistDataRange { +typedef struct SDataRange { SVersionRange range; STimeWindow window; -} SHistDataRange; +} SDataRange; typedef struct SSTaskBasicInfo { int32_t nodeId; // vgroup id or snode id @@ -309,7 +309,6 @@ typedef struct STaskInputInfo { typedef struct STaskSchedInfo { int8_t status; -// int64_t triggerParam; void* pTimer; } STaskSchedInfo; @@ -330,7 +329,7 @@ struct SStreamTask { SStreamStatus status; SCheckpointInfo chkInfo; STaskExec exec; - SHistDataRange dataRange; + SDataRange dataRange; SStreamTaskId historyTaskId; SStreamTaskId streamTaskId; STaskTimestamp tsInfo; @@ -419,6 +418,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); bool streamQueueIsFull(const STaosQueue* pQueue); typedef struct { @@ -681,8 +681,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); -int32_t appendTranstateIntoInputQ(SStreamTask* pTask); - // agg level int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 6c5eeb3424..b66d811284 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -64,7 +64,7 @@ set( "src/tq/tqPush.c" "src/tq/tqSink.c" "src/tq/tqCommit.c" - "src/tq/tqRestore.c" + "src/tq/tqStreamTask.c" "src/tq/tqSnapshot.c" "src/tq/tqOffsetSnapshot.c" "src/tq/tqStreamStateSnap.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2bc41e6b94..1146cfdc46 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -163,7 +163,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqScanWalForStreamTasks(STQ* pTq); +int32_t tqScanWal(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d47baad64..5b848b51bd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1172,7 +1172,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); - appendTranstateIntoInputQ(pTask); + streamTaskPutTranstateIntoInputQ(pTask); streamTryExec(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; @@ -1346,7 +1346,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal - tqScanWalForStreamTasks(pTq); + tqScanWal(pTq); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqStreamTask.c similarity index 93% rename from source/dnode/vnode/src/tq/tqRestore.c rename to source/dnode/vnode/src/tq/tqStreamTask.c index 5efccc8f3c..3c0321f300 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,11 +16,12 @@ #include "tq.h" #include "vnd.h" -static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); -static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); +static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); -// extract submit block from WAL, and add them into the input queue for the sources tasks. -int32_t tqScanWalForStreamTasks(STQ* pTq) { +// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. +int32_t tqScanWal(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); @@ -31,7 +32,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) { // check all tasks bool shouldIdle = true; - createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle); + doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; @@ -140,7 +141,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - // for follower or vnode does not restored, do not launch the stream tasks. + // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { return TSDB_CODE_SUCCESS; } @@ -223,7 +224,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } -int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { +int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); if (pTask->chkInfo.currentVer < firstVer) { @@ -267,7 +268,8 @@ int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { return TSDB_CODE_SUCCESS; } -static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { +// todo handle memory error +void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; int64_t maxVer = pTask->dataRange.range.maxVer; @@ -279,7 +281,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - appendTranstateIntoInputQ(pTask); + /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */ streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", @@ -288,7 +290,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { } } -int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noDataInWal = true; int32_t vgId = pStreamMeta->vgId; @@ -356,7 +358,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = doSetOffsetForWalReader(pTask, vgId); + int32_t code = setWalReaderStartOffset(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); continue; @@ -369,7 +371,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue - checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); + handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -390,7 +392,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; - checkForFillHistoryVerRange(pTask, ver); + handleFillhistoryScanComplete(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5dd6d9087b..4b86b9713c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -108,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { // check status static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; SStreamTaskCheckReq req = { @@ -365,7 +365,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { return streamScanExec(pTask, 100); } -int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -764,7 +764,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { if (pTask->historyTaskId.taskId == 0) { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; if (pTask->info.fillHistory == 1) { qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, @@ -775,7 +775,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } } else { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; int64_t ekey = 0; if (pRange->window.ekey < INT64_MAX) { From 92e258617a4d8ef562e43b71afc592297a340ac2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 14:31:45 +0800 Subject: [PATCH 05/19] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 1 - source/dnode/vnode/src/vnd/vnodeSnapshot.c | 25 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 20d25dbceb..2f41a7c1bb 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -89,7 +89,6 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { taosArrayDestroy(pReader->tdbTbList); tdbTbcClose(pReader->pCur); taosMemoryFree(pReader); - return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index bfddeedd78..f19068ea88 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -87,6 +87,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; SVnode *pVnode = pReader->pVnode; + int32_t vgId = TD_VID(pReader->pVnode); // CONFIG ============== // FIXME: if commit multiple times and the config changed? @@ -220,30 +221,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // STREAM ============ - vInfo("stream task start"); + vInfo("vgId:%d stream task start", vgId); if (!pReader->streamTaskDone) { if (pReader->pStreamTaskReader == NULL) { - vInfo("stream task start 1"); code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); if (code) { - vInfo("stream task start err"); + vError("vgId:%d open streamtask snapshot reader failed, code:%s", vgId, tstrerror(code)); goto _err; } } + code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); - vInfo("stream task start 2"); if (code) { - vInfo("stream task start 3"); + vError("vgId:%d error happens during read data from streatask snapshot, code:%s", vgId, tstrerror(code)); goto _err; } else { if (*ppData) { + vInfo("vgId:%d no streamTask snapshot", vgId); goto _exit; - vInfo("stream task start 4"); } else { pReader->streamTaskDone = 1; code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); - vInfo("stream task start 5"); - if (code) goto _err; + if (code) { + goto _err; + } pReader->pStreamTaskReader = NULL; } } @@ -305,15 +306,15 @@ _exit: pReader->index++; *nData = sizeof(SSnapDataHdr) + pHdr->size; pHdr->index = pReader->index; - vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", TD_VID(pReader->pVnode), - pReader->index, pHdr->type, *nData); + vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index, + pHdr->type, *nData); } else { - vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index); + vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index); } return code; _err: - vError("vgId:%d, vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); + vError("vgId:%d, vnode snapshot read failed since %s", vgId, tstrerror(code)); return code; } From 9b7bbe01682d2ade61defbf772b3affb155e2546 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Aug 2023 14:42:25 +0800 Subject: [PATCH 06/19] audit table name --- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 4e384faf4c..8484148642 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2612,9 +2612,9 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { dropReq.igNotExists, dropReq.source); SName name = {0}; - tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB); + tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, dropReq.name, detail); + auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { From 9fd0bff673e948659bff7ff9377c571855239504 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 14:57:26 +0800 Subject: [PATCH 07/19] fix invalid insert --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 20d25dbceb..797c968dcd 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -251,11 +251,13 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t } tDecoderClear(&decoder); // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), - (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { + int64_t key[2] = {pTask->id.streamId, pTask->id.taskId}; + if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), + nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { taosMemoryFree(pTask); return -1; } + // mem leak or not ? taosMemoryFree(pTask); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing From 1833be028fa677daddf1d194e232753a4327e36a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 15:28:31 +0800 Subject: [PATCH 08/19] fix invalid insert --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 14 ++++---------- source/libs/stream/src/streamTask.c | 14 +++++++++++++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d880ae202e..f46a814c44 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -423,6 +423,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); bool streamQueueIsFull(const STaosQueue* pQueue); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 797c968dcd..86433e4652 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -236,29 +236,23 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t STqHandle handle; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; if (pHdr->type == SNAP_DATA_STREAM_TASK) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } + SStreamTaskId task = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeStreamTask(&decoder, pTask); + + code = tDecodeStreamTaskId(&decoder, &task); if (code < 0) { tDecoderClear(&decoder); - taosMemoryFree(pTask); goto _err; } tDecoderClear(&decoder); // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - int64_t key[2] = {pTask->id.streamId, pTask->id.taskId}; + int64_t key[2] = {task.streamId, task.taskId}; if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { - taosMemoryFree(pTask); return -1; } - // mem leak or not ? - taosMemoryFree(pTask); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6eb09b95ec..247e79ee8c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,6 +251,18 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) tEndDecode(pDecoder); return 0; } +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) { + int64_t ver; + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &ver) < 0) return -1; + if (ver != SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} static void freeItem(void* p) { SStreamContinueExecInfo* pInfo = p; @@ -538,4 +550,4 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { } qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); -} \ No newline at end of file +} From 9953a400ec6ecbdf26726ed42c97bc1787f145a2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 15:54:46 +0800 Subject: [PATCH 09/19] change chkp interval --- source/common/src/tglobal.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 02c56cc40f..ff9e922ee1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -128,7 +128,7 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = true; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -240,11 +240,11 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 20; +int32_t tsStreamCheckpointTickInterval = 600; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; -int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups +int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits From e10846d5eb4c819e31cf78ed8cb9e655d9eb1755 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 31 Aug 2023 16:33:26 +0800 Subject: [PATCH 10/19] fix(mxml): use v2.12 instead of 2.10 --- cmake/mxml_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/mxml_CMakeLists.txt.in b/cmake/mxml_CMakeLists.txt.in index 9dcb5df665..1ac90ebdd4 100644 --- a/cmake/mxml_CMakeLists.txt.in +++ b/cmake/mxml_CMakeLists.txt.in @@ -1,7 +1,7 @@ # cos ExternalProject_Add(mxml GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git - GIT_TAG release-2.10 + GIT_TAG release-2.12 SOURCE_DIR "${TD_CONTRIB_DIR}/mxml" #BINARY_DIR "" BUILD_IN_SOURCE TRUE From 12ffc58b7ea7b09b9842de388ef4b3e63a1df4fe Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Aug 2023 16:37:24 +0800 Subject: [PATCH 11/19] password and topic name --- source/dnode/mnode/impl/src/mndTopic.c | 6 +++++- source/dnode/mnode/impl/src/mndUser.c | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index fc52e4657e..0b243e0a9c 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -853,7 +853,11 @@ end: char detail[100] = {0}; sprintf(detail, "igNotExists:%d", dropReq.igNotExists); - auditRecord(pReq, pMnode->clusterId, "dropTopic", dropReq.name, "", detail); + SName name = {0}; + tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); + //reuse this function for topic + + auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, "", detail); return TSDB_CODE_ACTION_IN_PROGRESS; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 4b59debc26..b107442199 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1039,11 +1039,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[1000] = {0}; - sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s", + sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:", mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName); if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){ - auditRecord(pReq, pMnode->clusterId, "changePassword", alterReq.user, "", detail); + sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx", + mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, + alterReq.tabName); + auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail); } else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || alterReq.alterType == TSDB_ALTER_USER_ENABLE || From 52269e98d21cae4a113352d58955638d9573b256 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Thu, 31 Aug 2023 16:43:57 +0800 Subject: [PATCH 12/19] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 43bdc9b04a..929cf9ee4e 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -201,7 +201,6 @@ TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项 对于已经存在的超级表,检查列的schema信息 1. 检查列的schema信息是否匹配,对于不匹配的,则自动进行类型转换,当前只有数据长度大于4096byte时才报错,其余场景都能进行类型转换。 2. 检查列的个数是否相同,如果不同,需要显示的指定超级表与subquery的列的对应关系,否则报错;如果相同,可以指定对应关系,也可以不指定,不指定则按位置顺序对应。 -3. 至少自定义一个tag,否则报错。详见 自定义TAG ## 自定义TAG From 03de7ad5a65daad285d78138cd6905361fae3695 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Aug 2023 16:44:36 +0800 Subject: [PATCH 13/19] stream name --- source/dnode/mnode/impl/src/mndStream.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 46eb0d9957..9455aae8e3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -888,7 +888,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark); - auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail); + SName name = {0}; + tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB); + //reuse this function for stream + + auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, "", detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { @@ -1322,7 +1326,11 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { char detail[100] = {0}; sprintf(detail, "igNotExists:%d", dropReq.igNotExists); - auditRecord(pReq, pMnode->clusterId, "dropStream", dropReq.name, "", detail); + SName name = {0}; + tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); + //reuse this function for stream + + auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); From ef9d137df2eabdf3799ccaaf5679ae26201d5905 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Sep 2023 11:02:21 +0800 Subject: [PATCH 14/19] fix: code merge issue --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4c9cd739ff..aa264f3126 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2158,7 +2158,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STraceId* trace = &pMsg->msg.info.traceId; char* tbuf = taosMemoryCalloc(1, TSDB_FQDN_LEN * 5); - EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + EPSET_TO_STR(&pCtx->epSet, tbuf); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, pCtx->retryStep, pCtx->retryNextInterval); taosMemoryFree(tbuf); From e7b467df9983daefe8ca9e5999c6135b327bed69 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Sep 2023 11:13:06 +0800 Subject: [PATCH 15/19] fix: getSlotKey param issue --- source/libs/planner/src/planPhysiCreater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0600d1919c..e174753a97 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -887,7 +887,7 @@ static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1 static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhysiNode* pJoin) { SNode* pNode = NULL; - char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + char name[TSDB_COL_FNAME_LEN + 1] = {0}; SSHashObj* pHash = tSimpleHashInit(pJoin->pTargets->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pHash) { return TSDB_CODE_OUT_OF_MEMORY; From 8742f7c928078d1eaaa4ff23fd0c3d7e57c42fd4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Sep 2023 11:15:06 +0800 Subject: [PATCH 16/19] fix: add new param --- source/libs/planner/src/planPhysiCreater.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e174753a97..52c3546a39 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -896,7 +896,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys FOREACH(pNode, pJoin->pTargets) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); tSimpleHashPut(pHash, name, len, &pCol, POINTER_BYTES); } @@ -905,7 +905,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys FOREACH(pNode, pJoin->pOnLeft) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); SNode** p = tSimpleHashGet(pHash, name, len); if (p) { nodesListStrictAppend(pJoin->pTargets, *p); @@ -914,7 +914,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys } FOREACH(pNode, pJoin->pOnRight) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); SNode** p = tSimpleHashGet(pHash, name, len); if (p) { nodesListStrictAppend(pJoin->pTargets, *p); From 2af8a617e7d59d2fc02772e3edd898048c4dea03 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 1 Sep 2023 15:08:09 +0800 Subject: [PATCH 17/19] fix: add log to debug --- source/libs/tfs/src/tfsTier.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index a24e3cf021..1c47182e2a 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -116,7 +116,11 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) { if (pDisk == NULL) continue; - if (pDisk->size.avail < tsMinDiskFreeSize) continue; + if (pDisk->size.avail < tsMinDiskFreeSize) { + uInfo("disk %s is full and skip it, level:%d id:%d free size:%" PRId64 " min free size:%" PRId64, pDisk->path, + pDisk->level, pDisk->id, pDisk->size.avail, tsMinDiskFreeSize); + continue; + } retId = diskId; terrno = 0; From b2fb99741fa4fe03d26ee1fd06eda6f27a985e55 Mon Sep 17 00:00:00 2001 From: Wade Zhang Date: Fri, 1 Sep 2023 18:53:43 +0800 Subject: [PATCH 18/19] doc: remove cluster.md and storage.md --- docs/zh/17-operation/07-cluster.md | 80 ------------------------------ docs/zh/17-operation/09-storage.md | 56 --------------------- 2 files changed, 136 deletions(-) delete mode 100644 docs/zh/17-operation/07-cluster.md delete mode 100644 docs/zh/17-operation/09-storage.md diff --git a/docs/zh/17-operation/07-cluster.md b/docs/zh/17-operation/07-cluster.md deleted file mode 100644 index cf4bfafd53..0000000000 --- a/docs/zh/17-operation/07-cluster.md +++ /dev/null @@ -1,80 +0,0 @@ ---- -title: 集群运维 -description: TDengine 提供了多种集群运维手段以使集群运行更健康更高效 ---- - -为了使集群运行更健康更高效,TDengine 企业版提供了一些运维手段来帮助系统管理员更好地运维集群。 - -## 数据重整 - -TDengine 面向多种写入场景,在有些写入场景下,TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率,另一方面也会影响查询效率。为了解决上述问题,TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。 - -**语法** - -```sql -COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY']; -``` - -**效果** - -- 扫描并压缩指定的 DB 中所有 VGROUP 中 VNODE 的所有数据文件 -- COMPCAT 会删除被删除数据以及被删除的表的数据 -- COMPACT 会合并多个 STT 文件 -- 可通过 start with 关键字指定 COMPACT 数据的起始时间 -- 可通过 end with 关键字指定 COMPACT 数据的终止时间 - -**补充说明** - -- COMPACT 为异步,执行 COMPACT 命令后不会等 COMPACT 结束就会返回。如果上一个 COMPACT 没有完成则再发起一个 COMPACT 任务,则会等上一个任务完成后再返回。 -- COMPACT 可能阻塞写入,但不阻塞查询 -- COMPACT 的进度不可观测 - -## 集群负载再平衡 - -当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令 - -```sql -balance vgroup leader; -``` - -**功能** - -让所有的 vgroup 的 leade r在各自的replica节点上均匀分布。这个命令会让 vgroup 强制重新选举,通过重新选举,在选举的过程中,变换 vgroup 的leader,通过这个方式,最终让leader均匀分布。 - -**注意** - -Raft选举本身带有随机性,所以通过选举的重新分布产生的均匀分布也是带有一定的概率,不会完全的均匀。**该命令的副作用是影响查询和写入**,在vgroup重新选举时,从开始选举到选举出新的 leader 这段时间,这 个vgroup 无法写入和查询。选举过程一般在秒级完成。所有的vgroup会依次逐个重新选举。 - -## 恢复数据节点 - -在多节点三副本的集群环境中,如果某个 dnode 的磁盘损坏,该 dnode 会自动退出,但集群中其它的 dnode 仍然能够继续提供写入和查询服务。 - -在更换了损坏的磁盘后,如果想要让曾经主动退出的 dnode 重新加入集群提供服务,可以通过 `restore dnode` 命令来恢复该数据节点上的部分或全部逻辑节点,该功能依赖多副本中的其它副本进行数据复制,所以只在集群中 dnode 数量大于等于 3 且副本数为 3 的情况下能够工作。 - - -```sql -restore dnode ;# 恢复dnode上的mnode,所有vnode和qnode -restore mnode on dnode ;# 恢复dnode上的mnode -restore vnode on dnode ;# 恢复dnode上的所有vnode -restore qnode on dnode ;# 恢复dnode上的qnode -``` - -**限制** -- 该功能是基于已有的复制功能的恢复,不是灾难恢复或者备份恢复,所以对于要恢复的 mnode 和 vnode来说,使用该命令的前提是还存在该 mnode 或 vnode 的其它两个副本仍然能够正常工作。 -- 该命令不能修复数据目录中的个别文件的损坏或者丢失。例如,如果某个 mnode 或者 vnode 中的个别文件或数据损坏,无法单独恢复损坏的某个文件或者某块数据。此时,可以选择将该 mnode/vnode 的数据全部清空再进行恢复。 - - -## 虚拟组分裂 (Scale Out) - -当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后,可通过 `split vgroup` 命令把该 vgroup 分裂为两个虚拟组。分裂完成后,新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。这也是 TDengine 为企业版用户提供的 scale out 集群的能力。 - -```sql -split vgroup -``` - -**注意** -- 单副本库虚拟组,在分裂完成后,历史时序数据总磁盘空间使用量,可能会翻倍。所以,在执行该操作之前,通过增加 dnode 节点方式,确保集群中有足够的 CPU 和磁盘资源,避免资源不足现象发生。 -- 该命令为 DB 级事务;执行过程,当前DB的其它管理事务将会被拒绝。集群中,其它DB不受影响。 -- 分裂任务执行过程中,可持续提供读写服务;期间,可能存在可感知的短暂的读写业务中断。 -- 在分裂过程中,不支持流和订阅。分裂结束后,历史 WAL 会清空。 -- 分裂过程中,可支持节点宕机重启容错;但不支持节点磁盘故障容错。 \ No newline at end of file diff --git a/docs/zh/17-operation/09-storage.md b/docs/zh/17-operation/09-storage.md deleted file mode 100644 index 185b2c40ec..0000000000 --- a/docs/zh/17-operation/09-storage.md +++ /dev/null @@ -1,56 +0,0 @@ ---- -title: 多级存储 ---- - -## 多级存储 - -说明:多级存储功能仅企业版支持。 - -在默认配置下,TDengine 会将所有数据保存在 /var/lib/taos 目录下,而且每个 vnode 的数据文件保存在该目录下的不同目录。为扩大存储空间,尽量减少文件读取的瓶颈,提高数据吞吐率 TDengine 可通过配置系统参数 dataDir 让多个挂载的硬盘被系统同时使用。 - -除此之外,TDengine 也提供了数据分级存储的功能,将不同时间段的数据存储在挂载的不同介质上的目录里,从而实现不同“热度”的数据存储在不同的存储介质上,充分利用存储,节约成本。比如,最新采集的数据需要经常访问,对硬盘的读取性能要求高,那么用户可以配置将这些数据存储在 SSD 盘上。超过一定期限的数据,查询需求量没有那么高,那么可以存储在相对便宜的 HDD 盘上。 - -多级存储支持 3 级,每级最多可配置 16 个挂载点。 - -TDengine 多级存储配置方式如下(在配置文件/etc/taos/taos.cfg 中): - -``` -dataDir [path] -``` - -- path: 挂载点的文件夹路径 -- level: 介质存储等级,取值为 0,1,2。 - 0 级存储最新的数据,1 级存储次新的数据,2 级存储最老的数据,省略默认为 0。 - 各级存储之间的数据流向:0 级存储 -> 1 级存储 -> 2 级存储。 - 同一存储等级可挂载多个硬盘,同一存储等级上的数据文件分布在该存储等级的所有硬盘上。 - 需要说明的是,数据在不同级别的存储介质上的移动,是由系统自动完成的,用户无需干预。 -- primary: 是否为主挂载点,0(否)或 1(是),省略默认为 1。 - -在配置中,只允许一个主挂载点的存在(level=0,primary=1),例如采用如下的配置方式: - -``` -dataDir /mnt/data1 0 1 -dataDir /mnt/data2 0 0 -dataDir /mnt/data3 1 0 -dataDir /mnt/data4 1 0 -dataDir /mnt/data5 2 0 -dataDir /mnt/data6 2 0 -``` - -:::note - -1. 多级存储不允许跨级配置,合法的配置方案有:仅 0 级,仅 0 级+ 1 级,以及 0 级+ 1 级+ 2 级。而不允许只配置 level=0 和 level=2,而不配置 level=1。 -2. 禁止手动移除使用中的挂载盘,挂载盘目前不支持非本地的网络盘。 -3. 多级存储目前不支持删除已经挂载的硬盘的功能。 - -::: - -## 0 级负载均衡 - -在多级存储中,有且只有一个主挂载点,主挂载点承担了系统中最重要的元数据在座,同时各个 vnode 的主目录均存在于当前 dnode 主挂载点上,从而导致该 dnode 的写入性能受限于单个磁盘的 IO 吞吐能力。 - -从 TDengine 3.1.0.0 开始,如果一个 dnode 配置了多个 0 级挂载点,我们将该 dnode 上所有 vnode 的主目录均衡分布在所有的 0 级挂载点上,由这些 0 级挂载点共同承担写入负荷。在网络 I/O 及其它处理资源不成为瓶颈的情况下,通过优化集群配置,测试结果证明整个系统的写入能力和 0 级挂载点的数量呈现线性关系,即随着 0 级挂载点数量的增加,整个系统的写入能力也成倍增加。 - -## 同级挂载点选择策略 - -一般情况下,当 TDengine 要从同级挂载点中选择一个用于生成新的数据文件时,采用 round robin 策略进行选择。但现实中有可能每个磁盘的容量不相同,或者容量相同但写入的数据量不相同,这就导致会出现每个磁盘上的可用空间不均衡,在实际进行选择时有可能会选择到一个剩余空间已经很小的磁盘。为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 `minDiskFreeSize`,当某块磁盘上的可用空间小于等于这个阈值时,该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节,其值应该大于 2GB,即会跳过可用空间小于 2GB 的挂载点。 From 6a11bfb6e4780b8c31634c3b3553bd0407d5173e Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Fri, 1 Sep 2023 19:01:48 +0800 Subject: [PATCH 19/19] release docs for 3.1.1.0 --- cmake/cmake.version | 2 +- docs/en/28-releases/01-tdengine.md | 4 ++++ docs/zh/28-releases/01-tdengine.md | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index a6bf90fa3c..0e4785f643 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.1.1.0.alpha") + SET(TD_VER_NUMBER "3.2.0.0.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index c02b3227ca..6f863d8c25 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t import Release from "/components/ReleaseV3"; +## 3.1.1.0 + + + ## 3.1.0.3 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index d4e4b116b7..89bb8aaf8f 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.1.1.0 + + + ## 3.1.0.3