Merge pull request #23763 from taosdata/fix/pause_stream

fix(stream): fix error in generating token in bucket.
This commit is contained in:
Haojun Liao 2023-11-21 15:46:25 +08:00 committed by GitHub
commit 8c4558c5c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 39 additions and 15 deletions

View File

@ -156,6 +156,8 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
static const SSysDbTableSchema streamTaskSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = {

View File

@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo; SColumnInfoData *pColInfo;
SName n;
int32_t cols = 0; int32_t cols = 0;
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
@ -1534,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
STR_TO_VARSTR(dstStr, sinkQuota)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a");
memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }

View File

@ -213,8 +213,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
// do not launch the stream tasks, if it is a follower or not restored vnode. // do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -256,7 +258,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return -1; return -1;
} }
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks,
alreadyRestored);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID;

View File

@ -75,7 +75,8 @@ struct STokenBucket {
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double quotaRemain; // not consumed bytes per second double quotaRemain; // not consumed bytes per second
double quotaRate; // number of token per second double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp int64_t tokenFillTimestamp; // fill timestamp
int64_t quotaFillTimestamp; // fill timestamp
}; };
struct SStreamQueue { struct SStreamQueue {

View File

@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue; continue;
} }
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->checkpointingId; p->chkInfo.checkpointId = p->checkpointingId;

View File

@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->quotaRemain = pBucket->quotaCapacity; pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs(); pBucket->tokenFillTimestamp = taosGetTimestampMs();
pBucket->quotaFillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void fillTokenBucket(STokenBucket* pBucket, const char* id) { static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0); ASSERT(pBucket->numOfToken >= 0);
int32_t incNum = (delta / 1000.0) * pBucket->numRate; int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) { if (incNum > 0) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->fillTimestamp = now; pBucket->tokenFillTimestamp = now;
} }
// increase the new available quota as time goes on // increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->quotaRate; int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
if (incSize > 0) { if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now; pBucket->quotaFillTimestamp = now;
} }
if (incNum > 0 || incSize > 0) { if (incNum > 0 || incSize > 0) {
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
} }
} }

View File

@ -1086,7 +1086,6 @@ _end:
} }
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
stDebug("try to write to cf parname");
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(208, len(tdSql.queryResult)) tdSql.checkEqual(210, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))