From 58694c67dd73b187c6972d11c686997e2308f1f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jan 2024 11:50:52 +0800 Subject: [PATCH 1/3] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 18 +++++++++--------- include/util/tworker.h | 5 ++--- source/common/src/systable.c | 2 +- source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 8 ++------ source/dnode/mnode/impl/src/mndStream.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- 7 files changed, 18 insertions(+), 23 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d11a4ad23b..9ea655c15c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -685,18 +685,18 @@ typedef struct STaskStatusEntry { int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task int64_t activeCheckpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - bool checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not + int32_t chkpointTransId; // checkpoint trans id + bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/include/util/tworker.h b/include/util/tworker.h index 8508adf052..f39540d24b 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -23,7 +23,6 @@ extern "C" { #endif -typedef struct SQWorkerPool SQWorkerPool; typedef struct SWWorkerPool SWWorkerPool; typedef struct SQueueWorker { @@ -60,14 +59,14 @@ typedef struct SWWorker { SWWorkerPool *pool; } SWWorker; -typedef struct SWWorkerPool { +struct SWWorkerPool { int32_t max; // max number of workers int32_t num; int32_t nextId; // from 0 to max-1, cyclic const char *name; SWWorker *workers; TdThreadMutex mutex; -} SWWorkerPool; +}; int32_t tQWorkerInit(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2e52c77080..75a54a0cd5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -166,7 +166,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 79d21955d4..b72c4fe077 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 4.0; +float tsRatioOfVnodeStreamThreads = 1.0; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; @@ -621,7 +621,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { 0) return -1; - if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER, + if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 9a792a2774..8b80527447 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -407,12 +407,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { if (tWWorkerInit(pFPool) != 0) return -1; SSingleWorkerCfg mgmtCfg = { - .min = 1, - .max = 1, - .name = "vnode-mgmt", - .fp = (FItem)vmProcessMgmtQueue, - .param = pMgmt, - }; + .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; dDebug("vnode workers are initialized"); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 146f9f6fc4..95150b9d6e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1728,7 +1728,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); // output queue - // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); +// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // STR_TO_VARSTR(vbuf, buf); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index de435c63a3..04d34b0945 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1102,7 +1102,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); From 5f7ce21530cfc7535dc8fc80aedb03a27d60235b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jan 2024 11:59:29 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 21 ++++++++++---------- source/dnode/mnode/impl/src/mndStreamTrans.c | 4 ++-- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 7b71ad873b..58a4c92d3e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -72,7 +72,7 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); -int32_t mndStreamGetRelCheckpointTrans(SMnode *pMnode, int64_t streamUid); +int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 95150b9d6e..a69543a6d6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -81,7 +81,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void killCheckpointTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); +static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static void freeCheckpointCandEntry(void *); @@ -95,9 +95,6 @@ static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); -static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); - int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -1455,9 +1452,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // kill the related checkpoint trans - int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); + int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); + mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + killTransImpl(pMnode, transId, pStream->sourceDb); } removeStreamTasksInBuf(pStream, &execInfo); @@ -1502,9 +1500,10 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { #endif // kill the related checkpoint trans - int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); + int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); + mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + killTransImpl(pMnode, transId, pStream->sourceDb); } // drop the stream obj in execInfo @@ -2836,10 +2835,10 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -void killCheckpointTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { +void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { - mInfo("kill checkpoint transId:%d in Db:%s", transId, pDbName); + mInfo("kill active transId:%d in Db:%s", transId, pDbName); mndKillTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); } @@ -2859,7 +2858,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { } char* pDupDBName = strndup(pDBName, len); - killCheckpointTransImpl(pMnode, pTransInfo->transId, pDupDBName); + killTransImpl(pMnode, pTransInfo->transId, pDupDBName); taosMemoryFree(pDupDBName); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index f5047acc49..bfea4349b0 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -113,7 +113,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* return false; } -int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) { +int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { @@ -127,7 +127,7 @@ int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) { SStreamTransInfo tInfo = *pEntry; taosThreadMutexUnlock(&execInfo.lock); - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { return tInfo.transId; } } else { From d4fd544c745a3f46ef26ca7d9557945a8010d375 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jan 2024 15:18:11 +0800 Subject: [PATCH 3/3] refactor: remove sleep. --- source/libs/stream/src/streamExec.c | 49 ++++++++++--------- source/libs/stream/src/streamQueue.c | 33 +++---------- .../8-stream/at_once_state_window.py | 3 ++ 3 files changed, 35 insertions(+), 50 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index eb5a432235..18f7ed061a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -548,17 +548,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } -static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { - SStreamStatus* pStatus = &pTask->status; - - pStatus->schedIdleTime = idleTime; - pStatus->lastExecTs = taosGetTimestampMs(); -} - -static void clearTaskSchedInfo(SStreamTask* pTask) { - SStreamStatus* pStatus = &pTask->status; - pStatus->schedIdleTime = 0; -} +static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } +static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } +static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the @@ -574,21 +566,28 @@ int32_t doStreamExecTask(SStreamTask* pTask) { int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; + if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) { stDebug("s-task:%s stream task is stopped", id); - break; + return 0; } if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); setTaskSchedInfo(pTask, 500); - break; + return 0; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id); setTaskSchedInfo(pTask, 1000); - break; + return 0; + } + + if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) { + stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); + setTaskSchedInfo(pTask, 50); + return 0; } /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); @@ -597,9 +596,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - int32_t type = pInput->type; - // dispatch checkpoint msg to all downstream tasks + int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput); continue; @@ -646,7 +644,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (ver != pInfo->processedVer) { stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 " ckpt:%" PRId64, - pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); pInfo->processedVer = ver; } @@ -659,7 +657,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { // todo add lock SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name); + stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); streamTaskBuildCheckpoint(pTask); } else { // todo refactor @@ -672,8 +670,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr, - 0, tstrerror(code)); + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, + tstrerror(code)); } } @@ -774,19 +772,24 @@ int32_t streamResumeTask(SStreamTask* pTask) { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { - stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime); schedTaskInFuture(pTask); taosThreadMutexUnlock(&pTask->lock); + setLastExecTs(pTask, taosGetTimestampMs()); return 0; } else { int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); + if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); + setLastExecTs(pTask, taosGetTimestampMs()); + char* p = streamTaskGetStatus(pTask)->name; - stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); + stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, + pTask->status.schedStatus, pTask->status.lastExecTs); + return 0; } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5f55285ab1..d3287a6b96 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,8 +17,7 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec -#define WAIT_FOR_DURATION 40 -#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms +#define WAIT_FOR_DURATION 10 // todo refactor: // read data from input queue @@ -161,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(10); + taosMsleep(WAIT_FOR_DURATION); return TSDB_CODE_SUCCESS; } @@ -173,11 +172,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { - if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(WAIT_FOR_DURATION); - // todo remove it - continue; - } +// if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { +// taosMsleep(WAIT_FOR_DURATION); +// continue; +// } // restore the token to bucket if (*numOfBlocks > 0) { @@ -344,25 +342,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; - -#if 0 - // wait for the output queue is available for new data to dispatch - while (streamQueueIsFull(pTask->outputq.queue)) { - if (streamTaskShouldStop(pTask)) { - stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); - return TSDB_CODE_STREAM_EXEC_CANCELLED; - } - - int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); - double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); - // let's wait for there are enough space to hold this result pBlock - stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, - OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size); - - taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION); - } -#endif - int32_t code = taosWriteQitem(pQueue, pBlock); int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); diff --git a/tests/system-test/8-stream/at_once_state_window.py b/tests/system-test/8-stream/at_once_state_window.py index fa9f4ddd78..60a4f4e890 100644 --- a/tests/system-test/8-stream/at_once_state_window.py +++ b/tests/system-test/8-stream/at_once_state_window.py @@ -59,6 +59,9 @@ class TDTestCase: self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) range_times = self.tdCom.range_count state_window_max = self.tdCom.dataDict['state_window_max'] + + time.sleep(2) + for i in range(range_times): state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times)) for i in range(2, range_times+3):