Merge pull request #24426 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-01-11 16:47:05 +08:00 committed by GitHub
commit dc6187a2e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 66 additions and 87 deletions

View File

@ -685,18 +685,18 @@ typedef struct STaskStatusEntry {
int32_t statusLastDuration; // to record the last duration of current status int32_t statusLastDuration; // to record the last duration of current status
int64_t stage; int64_t stage;
int32_t nodeId; int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id int64_t activeCheckpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter; int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double sinkQuota; // existed quota size for sink task double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size double sinkDataSize; // sink to dst data size
} STaskStatusEntry; } STaskStatusEntry;
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {

View File

@ -23,7 +23,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQueueWorker { typedef struct SQueueWorker {
@ -60,14 +59,14 @@ typedef struct SWWorker {
SWWorkerPool *pool; SWWorkerPool *pool;
} SWWorker; } SWWorker;
typedef struct SWWorkerPool { struct SWWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t num; int32_t num;
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
const char *name; const char *name;
SWWorker *workers; SWWorker *workers;
TdThreadMutex mutex; TdThreadMutex mutex;
} SWWorkerPool; };
int32_t tQWorkerInit(SQWorkerPool *pool); int32_t tQWorkerInit(SQWorkerPool *pool);
void tQWorkerCleanup(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool);

View File

@ -166,7 +166,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
float tsRatioOfVnodeStreamThreads = 4.0; float tsRatioOfVnodeStreamThreads = 1.0;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;
@ -622,7 +622,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
0) 0)
return -1; return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER, if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0) CFG_DYN_NONE) != 0)
return -1; return -1;

View File

@ -407,12 +407,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
if (tWWorkerInit(pFPool) != 0) return -1; if (tWWorkerInit(pFPool) != 0) return -1;
SSingleWorkerCfg mgmtCfg = { SSingleWorkerCfg mgmtCfg = {
.min = 1, .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
.max = 1,
.name = "vnode-mgmt",
.fp = (FItem)vmProcessMgmtQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
dDebug("vnode workers are initialized"); dDebug("vnode workers are initialized");

View File

@ -72,7 +72,7 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelCheckpointTrans(SMnode *pMnode, int64_t streamUid); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
// for sma // for sma
// TODO refactor // TODO refactor

View File

@ -81,7 +81,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static void killCheckpointTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *); static void freeCheckpointCandEntry(void *);
@ -95,9 +95,6 @@ static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndInitStream(SMnode *pMnode) { int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_STREAM, .sdbType = SDB_STREAM,
@ -1455,9 +1452,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
// kill the related checkpoint trans // kill the related checkpoint trans
int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
if (transId != 0) { if (transId != 0) {
killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
killTransImpl(pMnode, transId, pStream->sourceDb);
} }
removeStreamTasksInBuf(pStream, &execInfo); removeStreamTasksInBuf(pStream, &execInfo);
@ -1502,9 +1500,10 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
#endif #endif
// kill the related checkpoint trans // kill the related checkpoint trans
int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
if (transId != 0) { if (transId != 0) {
killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
killTransImpl(pMnode, transId, pStream->sourceDb);
} }
// drop the stream obj in execInfo // drop the stream obj in execInfo
@ -1728,7 +1727,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue // output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf); // STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2836,10 +2835,10 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
void killCheckpointTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", transId, pDbName); mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans); mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
@ -2859,7 +2858,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
} }
char* pDupDBName = strndup(pDBName, len); char* pDupDBName = strndup(pDBName, len);
killCheckpointTransImpl(pMnode, pTransInfo->transId, pDupDBName); killTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName); taosMemoryFree(pDupDBName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -113,7 +113,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
return false; return false;
} }
int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) { int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) { if (num <= 0) {
@ -127,7 +127,7 @@ int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) {
SStreamTransInfo tInfo = *pEntry; SStreamTransInfo tInfo = *pEntry;
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
return tInfo.transId; return tInfo.transId;
} }
} else { } else {

View File

@ -548,17 +548,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return code; return code;
} }
static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
SStreamStatus* pStatus = &pTask->status; static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
pStatus->schedIdleTime = idleTime;
pStatus->lastExecTs = taosGetTimestampMs();
}
static void clearTaskSchedInfo(SStreamTask* pTask) {
SStreamStatus* pStatus = &pTask->status;
pStatus->schedIdleTime = 0;
}
/** /**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
@ -574,21 +566,28 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
int32_t blockSize = 0; int32_t blockSize = 0;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL; SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) { if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) {
stDebug("s-task:%s stream task is stopped", id); stDebug("s-task:%s stream task is stopped", id);
break; return 0;
} }
if (streamQueueIsFull(pTask->outputq.queue)) { if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
setTaskSchedInfo(pTask, 500); setTaskSchedInfo(pTask, 500);
break; return 0;
} }
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
setTaskSchedInfo(pTask, 1000); setTaskSchedInfo(pTask, 1000);
break; return 0;
}
if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) {
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id);
setTaskSchedInfo(pTask, 50);
return 0;
} }
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
@ -597,9 +596,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t type = pInput->type;
// dispatch checkpoint msg to all downstream tasks // dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput); streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput);
continue; continue;
@ -646,7 +644,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
if (ver != pInfo->processedVer) { if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64, " ckpt:%" PRId64,
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver; pInfo->processedVer = ver;
} }
@ -659,7 +657,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
// todo add lock // todo add lock
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState->state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name); stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name);
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);
} else { } else {
// todo refactor // todo refactor
@ -672,8 +670,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode // todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr, stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
0, tstrerror(code)); tstrerror(code));
} }
} }
@ -774,19 +772,24 @@ int32_t streamResumeTask(SStreamTask* pTask) {
// check if this task needs to be idle for a while // check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) { if (pTask->status.schedIdleTime > 0) {
stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime);
schedTaskInFuture(pTask); schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return 0; return 0;
} else { } else {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs);
return 0; return 0;
} }
} }

View File

@ -1102,7 +1102,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
}; };
entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);

View File

@ -17,8 +17,7 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 40 #define WAIT_FOR_DURATION 10
#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue
@ -161,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit // no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
taosMsleep(10); taosMsleep(WAIT_FOR_DURATION);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -173,11 +172,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
if (qItem == NULL) { if (qItem == NULL) {
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { // if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(WAIT_FOR_DURATION); // taosMsleep(WAIT_FOR_DURATION);
// todo remove it // continue;
continue; // }
}
// restore the token to bucket // restore the token to bucket
if (*numOfBlocks > 0) { if (*numOfBlocks > 0) {
@ -344,25 +342,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
// the result should be put into the outputQ in any cases, the result may be lost otherwise. // the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue; STaosQueue* pQueue = pTask->outputq.queue->pQueue;
#if 0
// wait for the output queue is available for new data to dispatch
while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(pTask)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock
stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size);
taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION);
}
#endif
int32_t code = taosWriteQitem(pQueue, pBlock); int32_t code = taosWriteQitem(pQueue, pBlock);
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);

View File

@ -59,6 +59,9 @@ class TDTestCase:
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
range_times = self.tdCom.range_count range_times = self.tdCom.range_count
state_window_max = self.tdCom.dataDict['state_window_max'] state_window_max = self.tdCom.dataDict['state_window_max']
time.sleep(2)
for i in range(range_times): for i in range(range_times):
state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times)) state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times))
for i in range(2, range_times+3): for i in range(2, range_times+3):