diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5f44d3e7fc..a1f8d74571 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -156,6 +156,8 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ac53ce5d31..02d401d924 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; - SName n; int32_t cols = 0; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -1534,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + numOfRows++; sdbRelease(pSdb, pStream); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4974e77ffa..e08385c3ac 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -213,8 +213,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + bool alreadyRestored = pTq->pVnode->restored; + // do not launch the stream tasks, if it is a follower or not restored vnode. - if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { + if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { return TSDB_CODE_SUCCESS; } @@ -256,7 +258,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return -1; } - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks, + alreadyRestored); + pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b76a967d0d..6dd1e5c1c3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -75,7 +75,8 @@ struct STokenBucket { double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaRemain; // not consumed bytes per second double quotaRate; // number of token per second - int64_t fillTimestamp; // fill timestamp + int64_t tokenFillTimestamp; // fill timestamp + int64_t quotaFillTimestamp; // fill timestamp }; struct SStreamQueue { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5540e3b6fd..6201329b95 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 556de169b4..d19dfc13bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaRemain = pBucket->quotaCapacity; - pBucket->fillTimestamp = taosGetTimestampMs(); + pBucket->tokenFillTimestamp = taosGetTimestampMs(); + pBucket->quotaFillTimestamp = taosGetTimestampMs(); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); return TSDB_CODE_SUCCESS; } static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); - int64_t delta = now - pBucket->fillTimestamp; + + int64_t deltaToken = now - pBucket->tokenFillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t incNum = (delta / 1000.0) * pBucket->numRate; + int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; if (incNum > 0) { pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); - pBucket->fillTimestamp = now; + pBucket->tokenFillTimestamp = now; } // increase the new available quota as time goes on - double incSize = (delta / 1000.0) * pBucket->quotaRate; + int64_t deltaQuota = now - pBucket->quotaFillTimestamp; + double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); - pBucket->fillTimestamp = now; + pBucket->quotaFillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", - pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); + stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64 + ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s", + pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id); } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 2e51200fe4..0f32fd6879 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1086,7 +1086,6 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - stDebug("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index ac952d383a..2bfe33d0af 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -217,7 +217,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(208, len(tdSql.queryResult)) + tdSql.checkEqual(210, len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult))