finished elastic block test

This commit is contained in:
AlexDuan 2021-08-23 17:46:48 +08:00
parent 3d79ac346f
commit 661fff6929
5 changed files with 21 additions and 26 deletions

View File

@ -193,7 +193,6 @@ typedef struct {
SList * bufBlockList; SList * bufBlockList;
int64_t pointsAdd; // TODO int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO int64_t storageAdd; // TODO
int64_t commitedMs; // commited ms time , zero is no commit.
} SMemTable; } SMemTable;
typedef struct { typedef struct {

View File

@ -249,7 +249,7 @@ int waitMoment(SQInfo* pQInfo){
taosMsleep(1000); taosMsleep(1000);
used_ms += 1000; used_ms += 1000;
if(isQueryKilled(pQInfo)){ if(isQueryKilled(pQInfo)){
printf(" check query is canceled, sleep break... \n"); printf(" check query is canceled, sleep break... %s\n", pQInfo->sql);
break; break;
} }
} }
@ -626,7 +626,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
qWarn("QId:0x%"PRIx64" query killed becase no memory commit.", pQInfo->qId); qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
// wait query stop // wait query stop
@ -647,20 +647,19 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
typedef struct { typedef struct {
int64_t qId; int64_t qId;
int64_t startExecTs; int64_t startExecTs;
int64_t commitedMs;
} SLongQuery; } SLongQuery;
// callbark for sort compare // callbark for sort compare
static int compareLongQuery(const void* p1, const void* p2) { static int compareLongQuery(const void* p1, const void* p2) {
// sort desc // sort desc
SLongQuery* plq1 = (SLongQuery*)p1; SLongQuery* plq1 = *(SLongQuery**)p1;
SLongQuery* plq2 = (SLongQuery*)p2; SLongQuery* plq2 = *(SLongQuery**)p2;
if(plq1->startExecTs == plq2->startExecTs) { if(plq1->startExecTs == plq2->startExecTs) {
return 0; return 0;
} else if(plq1->startExecTs > plq2->startExecTs) { } else if(plq1->startExecTs > plq2->startExecTs) {
return -1;
} else {
return 1; return 1;
} else {
return -1;
} }
} }
@ -686,15 +685,7 @@ static void cbFoundItem(void* handle, void* param1) {
// push to qids // push to qids
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->qId = qInfo->qId; plq->qId = qInfo->qId;
plq->startExecTs = qInfo->startExecTs; plq->startExecTs = qInfo->startExecTs;
// commitedMs
if(imem) {
plq->commitedMs = imem->commitedMs;
} else {
plq->commitedMs = 0;
}
taosArrayPush(qids, &plq); taosArrayPush(qids, &plq);
} }
@ -735,11 +726,13 @@ bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
SLongQuery* plq; SLongQuery* plq;
for(i=0; i < cnt; i++) { for(i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i); plq = (SLongQuery* )taosArrayGetP(qids, i);
printf(" sort i=%d span=%d qid=0x%"PRIx64" exeTime=0x%"PRIx64". \n",(int)i, (int)(now - plq->startExecTs), plq->qId, plq->startExecTs);
if(plq->startExecTs > now) continue; if(plq->startExecTs > now) continue;
if(now - plq->startExecTs >= longQueryMs) { if(now - plq->startExecTs >= longQueryMs) {
qKillQueryByQId(pMgmt, plq->qId, 100, 30); // wait 50*100 ms qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms
if(tsdbNoProblem(pRepo)) { if(tsdbNoProblem(pRepo)) {
fixed = true; fixed = true;
qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId);
break; break;
} }
} }
@ -755,8 +748,9 @@ bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
//solve tsdb no block to commit //solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
qWarn("start solve no block problem.");
if(qFixedNoBlock(pRepo, pMgmt, 20*1000)) { if(qFixedNoBlock(pRepo, pMgmt, 20*1000)) {
return true; return true;
} }
return qFixedNoBlock(pRepo, pMgmt, 5*1000); return qFixedNoBlock(pRepo, pMgmt, 5*1000);
} }

View File

@ -216,7 +216,10 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
if(bELastic) if(bELastic)
pPool->nElasticBlocks--; {
pPool->nElasticBlocks--;
printf(" elastic block reduce one ok. current blocks=%d \n", pPool->nElasticBlocks);
}
else else
pPool->nBufBlocks--; pPool->nBufBlocks--;
} }

View File

@ -548,7 +548,6 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
pRepo->imem = NULL; pRepo->imem = NULL;
(void)tsdbUnlockRepo(pRepo); (void)tsdbUnlockRepo(pRepo);
//save commited time //save commited time
pIMem->commitedMs = taosGetTimestampMs();
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }

View File

@ -40,6 +40,7 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
} else { } else {
pPool->nElasticBlocks ++; pPool->nElasticBlocks ++;
cnt ++ ; cnt ++ ;
printf(" elastic block add one ok. current blocks=%d \n", pPool->nElasticBlocks);
} }
} }
} }
@ -68,7 +69,7 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
bool tsdbIdleMemEnough() { bool tsdbIdleMemEnough() {
// TODO config to taos.cfg // TODO config to taos.cfg
int32_t lowestRate = 10; // below 10% idle memory, return not enough memory int32_t lowestRate = 5; // below 10% idle memory, return not enough memory
float memoryUsedMB = 0; float memoryUsedMB = 0;
float memoryAvailMB; float memoryAvailMB;
@ -94,8 +95,9 @@ bool tsdbIdleMemEnough() {
bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
//TODO config to taos.cfg //TODO config to taos.cfg
int32_t nMaxElastic = 0; int32_t nMaxElastic = 1;
STsdbBufPool* pPool = pRepo->pPool; STsdbBufPool* pPool = pRepo->pPool;
printf("tsdbAllowNewBlock nElasticBlock(%d) MaxElasticBlocks(%d)\n", pPool->nElasticBlocks, nMaxElastic);
if(pPool->nElasticBlocks >= nMaxElastic) { if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic);
return false; return false;
@ -106,9 +108,7 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
bool tsdbNoProblem(STsdbRepo* pRepo) { bool tsdbNoProblem(STsdbRepo* pRepo) {
if(!tsdbIdleMemEnough()) if(!tsdbIdleMemEnough())
return false; return false;
if(listNEles(pRepo->pPool->bufBlockList) == 0)
if(listNEles(pRepo->pPool->bufBlockList))
return false; return false;
return true; return true;
} }