diff --git a/src/inc/query.h b/src/inc/query.h index 28bd14e66f..0872e3dbaa 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -79,6 +79,8 @@ int32_t qKillQuery(qinfo_t qinfo); //kill by qid int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount); +bool qSolveCommitNoBlock(void* pRepo, void* pMgmt); + int32_t qQueryCompleted(qinfo_t qinfo); /** @@ -97,15 +99,6 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); bool checkQIdEqual(void *qHandle, uint64_t qId); int64_t genQueryId(void); -// util - -typedef struct { - int64_t qId; - int32_t timeMs; -} SLongQuery; -// return SArray* include SLongQuery* -void* qObtainLongQuery(void* qMgmt, int32_t longMs); - #ifdef __cplusplus } #endif diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 5e5ecc2438..52c99a3fe5 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -39,6 +39,7 @@ extern "C" { #define TSDB_STATUS_COMMIT_START 1 #define TSDB_STATUS_COMMIT_OVER 2 +#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved // TSDB STATE DEFINITION #define TSDB_STATE_OK 0x0 @@ -414,6 +415,10 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); // For TSDB Compact int tsdbCompact(STsdbRepo *pRepo); +// For TSDB Health Monitor +bool tsdbAllowNewBlock(STsdbRepo* pRepo); +bool tsdbIdleMemEnough(); + #ifdef __cplusplus } #endif diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index b2e6ed7a7b..d9b01b031d 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -628,8 +628,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - - qDebug("QInfo:0x%"PRIx64" query killed by qid.", pQInfo->qId); + qWarn("QId:0x%"PRIx64" query killed becase no memory commit.", pQInfo->qId); setQueryKilled(pQInfo); // wait query stop @@ -645,6 +644,13 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return error; } +// local struct +typedef struct { + int64_t qId; + int32_t timeMs; +} SLongQuery; + +// compare int compareLongQuery(const void* p1, const void* p2) { // sort desc SLongQuery* plq1 = (SLongQuery*)p1; @@ -658,7 +664,7 @@ int compareLongQuery(const void* p1, const void* p2) { } } -// util +// longquery void* qObtainLongQuery(void* param, int32_t longMs){ SQueryMgmt* qMgmt = (SQueryMgmt*)param; if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL; @@ -700,4 +706,30 @@ void* qObtainLongQuery(void* param, int32_t longMs){ } return qids; +} + +//solve tsdb no block to commit +bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { + SQueryMgmt *pQueryMgmt = pMgmt; + int32_t longMs = 2000; // TODO config to taos.cfg + + // qid top list + SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt, longMs); + if(qids == NULL) return false; + + // kill Query + size_t cnt = taosArrayGetSize(qids); + SLongQuery* plq; + for(size_t i=0; i < cnt; i++) { + plq = (SLongQuery* )taosArrayGetP(qids, i); + qKillQueryByQId(pMgmt, plq->qId, 100, 50); // wait 50*100 ms + + // check break condition + if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { + break; + } + } + // free qids + taosArrayDestroyEx(qids, free); + return true; } \ No newline at end of file diff --git a/src/tsdb/inc/tsdbHealth.h b/src/tsdb/inc/tsdbHealth.h index 9c48f552bb..e70c26f939 100644 --- a/src/tsdb/inc/tsdbHealth.h +++ b/src/tsdb/inc/tsdbHealth.h @@ -19,7 +19,4 @@ bool tsdbUrgeQueryFree(STsdbRepo* pRepo); int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); -bool enoughIdleMemory(); -bool allowNewBlock(STsdbRepo* pRepo); - #endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index 1c1c45c9ae..e5e95e405f 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -24,19 +24,12 @@ #include "tsdbHealth.h" #include "ttimer.h" -#include "../../vnode/inc/vnodeInt.h" -// get qmgmt -void* vnodeGetqMgmt(void* pVnode){ - if(pVnode == NULL) return NULL; - return ((SVnodeObj*)pVnode)->qMgmt; -} - // return malloc new block count int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { STsdbBufPool *pPool = pRepo->pPool; int32_t cnt = 0; - if(enoughIdleMemory() && allowNewBlock(pRepo)) { + if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); if (pBufBlock) { if (tsdbLockRepo(pRepo) >= 0) { @@ -57,36 +50,10 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { // switch anther thread to run void cbKillQueryFree(void* param1, void* param2) { STsdbRepo* pRepo = (STsdbRepo*)param1; - int32_t longMs = 2000; // TODO config to taos.cfg - // vnode - void* vnodeObj = pRepo->appH.appH; - if(vnodeObj == NULL) return ; - - // qMgmt - void* qMgmt = vnodeGetqMgmt(vnodeObj); - if(qMgmt == NULL) return ; - - // qid top list - SArray *qids = (SArray*)qObtainLongQuery(qMgmt, longMs); - if(qids == NULL) return ; - - // kill Query - size_t cnt = taosArrayGetSize(qids); - int64_t qId = 0; - for(size_t i=0; i < cnt; i++) { - qId = *(int64_t*)taosArrayGetP(qids, i); - qKillQueryByQId(qMgmt, qId, 100, 50); // wait 50*100 ms - // notify wait - pthread_cond_signal(&pRepo->pPool->poolNotEmpty); - // check break condition - if(enoughIdleMemory() && allowNewBlock(pRepo)) { - break; - } + if(pRepo->appH.notifyStatus) { + pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS); } - - // free qids - taosArrayDestroyEx(qids, free); } // return true do free , false do nothing @@ -96,7 +63,7 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { return hTimer != NULL; } -bool enoughIdleMemory(){ +bool tsdbIdleMemEnough() { // TODO config to taos.cfg int32_t lowestRate = 20; // below 20% idle memory, return not enough memory float memoryUsedMB = 0; @@ -122,7 +89,7 @@ bool enoughIdleMemory(){ return true; } -bool allowNewBlock(STsdbRepo* pRepo){ +bool tsdbAllowNewBlock(STsdbRepo* pRepo) { //TODO config to taos.cfg int32_t nElasticBlocks = 10; STsdbBufPool* pPool = pRepo->pPool; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f826c1aecd..c823880ae2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return vnodeSaveVersion(pVnode); } + // timer thread callback + if(status == TSDB_STATUS_COMMIT_NOBLOCK) { + qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt); + } + return 0; }