diff --git a/src/inc/query.h b/src/inc/query.h index fb9cbff858..28bd14e66f 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -76,6 +76,9 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo); */ int32_t qKillQuery(qinfo_t qinfo); +//kill by qid +int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount); + int32_t qQueryCompleted(qinfo_t qinfo); /** @@ -94,6 +97,15 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); bool checkQIdEqual(void *qHandle, uint64_t qId); int64_t genQueryId(void); +// util + +typedef struct { + int64_t qId; + int32_t timeMs; +} SLongQuery; +// return SArray* include SLongQuery* +void* qObtainLongQuery(void* qMgmt, int32_t longMs); + #ifdef __cplusplus } #endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 368658377c..b04970e85d 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -35,6 +35,7 @@ int32_t* taosGetErrno(); #define terrno (*taosGetErrno()) #define TSDB_CODE_SUCCESS 0 +#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error // rpc #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress") diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7abe3e99c7..5e5ecc2438 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -192,6 +192,7 @@ typedef struct { SList * bufBlockList; int64_t pointsAdd; // TODO int64_t storageAdd; // TODO + int64_t commitedMs; // commited ms time , zero is no commit. } SMemTable; typedef struct { diff --git a/src/inc/vnode.h b/src/inc/vnode.h index b3291645c0..2cc56af9de 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -88,6 +88,9 @@ int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qt void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); +// util +void* vnodeGetqMgmt(void* pVnode); + #ifdef __cplusplus } #endif diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index bb3f262c98..27680a7151 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -219,6 +219,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi int waitMoment(SQInfo* pQInfo){ if(pQInfo->sql) { int ms = 0; + char* pcnt = strstr(pQInfo->sql, " count(*)"); + if(pcnt) return 0; + char* pos = strstr(pQInfo->sql, " t_"); if(pos){ pos += 3; @@ -604,3 +607,87 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); return 0; } + +//kill by qid +int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) { + int32_t error = TSDB_CODE_SUCCESS; + void** handle = qAcquireQInfo(pMgmt, qId); + if(handle == NULL) return terrno; + + SQInfo* pQInfo = (SQInfo*)(*handle); + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("QInfo:0x%"PRIx64" query killed by qid.", pQInfo->qId); + setQueryKilled(pQInfo); + + // wait query stop + int32_t loop = 0; + while (pQInfo->owner != 0) { + taosMsleep(waitMs); + if(loop++ > waitCount){ + error = TSDB_CODE_FAILED; + break; + } + } + + return error; +} + +int compareLongQuery(const void* p1, const void* p2) { + // sort desc + SLongQuery* plq1 = (SLongQuery*)p1; + SLongQuery* plq2 = (SLongQuery*)p2; + if(plq1->timeMs == plq2->timeMs) { + return 0; + } else if(plq1->timeMs > plq2->timeMs) { + return -1; + } else { + return 1; + } +} + +// util +void* qObtainLongQuery(void* param, int32_t longMs){ + SQueryMgmt* qMgmt = (SQueryMgmt*)param; + if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL; + SArray* qids = taosArrayInit(4, sizeof(int64_t*)); + + SHashObj* pHashTable = qMgmt->qinfoPool->pHashTable; + if(pHashTable == NULL || pHashTable->hashList == NULL) return NULL; + + SQInfo * qInfo = (SQInfo*)taosHashIterate(pHashTable, NULL); + while(qInfo){ + // judge long query + SMemTable* imem = qInfo->runtimeEnv.pQueryAttr->memRef.snapshot.imem; + if(imem == NULL || imem->commitedMs == 0) continue; + int64_t now = taosGetTimestampMs(); + if(imem->commitedMs > now) continue; // weird, so skip + + int32_t passMs = now - imem->commitedMs; + if(passMs < longMs) { + continue; + } + + // push + SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); + plq->timeMs = passMs; + plq->qId = qInfo->qId; + taosArrayPush(qids, plq); + + // next + qInfo = (SQInfo*)taosHashIterate(pHashTable, qInfo); + } + + size_t cnt = taosArrayGetSize(qids); + if(cnt == 0) { + taosArrayDestroyEx(qids, free); + return NULL; + } + if(cnt > 1) { + taosArraySort(qids, compareLongQuery); + } + + return qids; +} \ No newline at end of file diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index ec6b057aef..17919c284e 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -43,4 +43,8 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); +// health cite +STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); +void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock); + #endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/inc/tsdbHealth.h b/src/tsdb/inc/tsdbHealth.h new file mode 100644 index 0000000000..9c48f552bb --- /dev/null +++ b/src/tsdb/inc/tsdbHealth.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_HEALTH_H_ +#define _TD_TSDB_HEALTH_H_ + +bool tsdbUrgeQueryFree(STsdbRepo* pRepo); +int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); + +bool enoughIdleMemory(); +bool allowNewBlock(STsdbRepo* pRepo); + +#endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index e675bf6f9d..a8d800208c 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -14,12 +14,10 @@ */ #include "tsdbint.h" +#include "tsdbHealth.h" #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) -static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock); - // ---------------- INTERNAL FUNCTIONS ---------------- STsdbBufPool *tsdbNewBufPool() { STsdbBufPool *pBufPool = (STsdbBufPool *)calloc(1, sizeof(*pBufPool)); @@ -120,6 +118,14 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { + // supply new Block + if(tsdbInsertNewBlock(pRepo) > 0) { + break; + } else { + // no newBlock, kill query free + tsdbUrgeQueryFree(pRepo); + } + pRepo->repoLocked = false; pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pRepo->repoLocked = true; @@ -139,7 +145,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { } // ---------------- LOCAL FUNCTIONS ---------------- -static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { +STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); if (pBufBlock == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -157,7 +163,7 @@ _err: return NULL; } -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } + void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { @@ -199,4 +205,4 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { tsdbFreeBufBlock(pBufBlock); free(pNode); pPool->nBufBlocks--; -} +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 8f5f885d69..6fae5c6555 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -547,6 +547,8 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { (void)tsdbLockRepo(pRepo); pRepo->imem = NULL; (void)tsdbUnlockRepo(pRepo); + //save commited time + pIMem->commitedMs = taosGetTimestampMs(); tsdbUnRefMemTable(pRepo, pIMem); tsem_post(&(pRepo->readyToCommit)); } diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c new file mode 100644 index 0000000000..7cd6672e93 --- /dev/null +++ b/src/tsdb/src/tsdbHealth.c @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taosmsg.h" +#include "tarray.h" +#include "query.h" +#include "tglobal.h" +#include "tsdbint.h" +#include "tsdbBuffer.h" +#include "tsdbLog.h" +#include "tsdbHealth.h" +#include "tsdbint.h" +#include "ttimer.h" +#include "vnode.h" + + +// return malloc new block count +int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { + STsdbBufPool *pPool = pRepo->pPool; + int32_t cnt = 0; + + if(enoughIdleMemory() && allowNewBlock(pRepo)) { + STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (pBufBlock) { + if (tsdbLockRepo(pRepo) >= 0) { + if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { + // append error + tsdbFreeBufBlock(pBufBlock); + } else { + pPool->nRecycleBlocks ++; + cnt ++ ; + } + tsdbUnlockRepo(pRepo); + } + } + } + return cnt; +} + +// switch anther thread to run +void cbKillQueryFree(void* param1, void* param2) { + STsdbRepo* pRepo = (STsdbRepo*)param1; + int32_t longMs = 2000; // TODO config to taos.cfg + + // vnode + void* vnodeObj = pRepo->appH.appH; + if(vnodeObj == NULL) return ; + + // qMgmt + void* qMgmt = vnodeGetqMgmt(vnodeObj); + if(qMgmt == NULL) return ; + + // qid top list + SArray *qids = (SArray*)qObtainLongQuery(qMgmt, longMs); + if(qids == NULL) return ; + + // kill Query + size_t cnt = taosArrayGetSize(qids); + int64_t qId = 0; + for(size_t i=0; i < cnt; i++) { + qId = *(int64_t*)taosArrayGetP(qids, i); + qKillQueryByQId(qMgmt, qId, 100, 50); // wait 50*100 ms + // notify wait + pthread_cond_signal(&pRepo->pPool->poolNotEmpty); + // check break condition + if(enoughIdleMemory() && allowNewBlock(pRepo)) { + break; + } + } + + // free qids + taosArrayDestroyEx(qids, free); +} + +// return true do free , false do nothing +bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { + // 1 start timer + tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, NULL); + return hTimer != NULL; +} + +bool enoughIdleMemory(){ + // TODO config to taos.cfg + int32_t lowestRate = 20; // below 20% idle memory, return not enough memory + float memoryUsedMB = 0; + float memoryAvailMB; + + if (true != taosGetSysMemory(&memoryUsedMB)) { + tsdbWarn("tsdbHealth get memory error, return false."); + return false; + } + + if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) { + tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB); + return false; + } + + memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB; + int32_t rate = (int32_t)(memoryAvailMB/tsTotalMemoryMB * 100); + if(rate < lowestRate){ + tsdbWarn("tsdbHealth real rate :%d less than lowest rate:%d, so return false.", rate, lowestRate); + return false; + } + + return true; +} + +bool allowNewBlock(STsdbRepo* pRepo){ + //TODO config to taos.cfg + int32_t nElasticBlocks = 10; + STsdbBufPool* pPool = pRepo->pPool; + int32_t nOverBlocks = pPool->nBufBlocks - pRepo->config.totalBlocks; + if(nOverBlocks > nElasticBlocks) { + tsdbWarn("tsdbHealth allowNewBlock forbid. nOverBlocks(%d) > nElasticBlocks(%d)", nOverBlocks, nElasticBlocks); + return false; + } + + return true; +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f826c1aecd..7802d2a081 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -562,3 +562,9 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return 0; } + +// get qmgmt +void* vnodeGetqMgmt(void* pVnode){ + if(pVnode == NULL) return NULL; + return ((SVnodeObj*)pVnode)->qMgmt; +}