diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 49f26e2c24..078ecbb4db 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1217,6 +1217,9 @@ static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *c kv->value = value; kv->length = valueLen; if (isTag) { + if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; + } kv->type = TSDB_DATA_TYPE_NCHAR; } else { int32_t ret = smlParseValue(kv, msg); diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 71792a2354..c579f82a9d 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -153,23 +153,22 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * return terrno; } - taosThreadRwlockUnlock(pLock); - int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; if (insertFp != NULL) { code = (*insertFp)(pSdb, pRow->pObj); if (code != 0) { code = terrno; - taosThreadRwlockWrlock(pLock); taosHashRemove(hash, pRow->pObj, keySize); - taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = code; + taosThreadRwlockUnlock(pLock); return terrno; } } + taosThreadRwlockUnlock(pLock); + if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); } @@ -194,7 +193,6 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; sdbPrintOper(pSdb, pOldRow, "update"); - taosThreadRwlockUnlock(pLock); int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; @@ -202,6 +200,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pNewRow, false); pSdb->tableVer[pOldRow->type]++; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4e2c762cd3..30a6188db0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -174,6 +174,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); +void tsdbRefMemTable(SMemTable *pMemTable); +void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem); +void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter); @@ -273,23 +277,14 @@ typedef struct { } SRtn; struct STsdb { - char *path; - SVnode *pVnode; - TdThreadMutex mutex; - bool repoLocked; - STsdbKeepCfg keepCfg; - SMemTable *mem; - SMemTable *imem; - SRtn rtn; - STsdbFS *fs; - SLRUCache *lruCache; -}; - -struct STable { - uint64_t suid; - uint64_t uid; - STSchema *pSchema; // latest schema - STSchema *pCacheSchema; // cached cache + char *path; + SVnode *pVnode; + STsdbKeepCfg keepCfg; + TdThreadRwlock rwLock; + SMemTable *mem; + SMemTable *imem; + STsdbFS *pFS; + SLRUCache *lruCache; }; struct TSDBKEY { @@ -330,21 +325,19 @@ struct STbData { }; struct SMemTable { - SRWLatch latch; - STsdb *pTsdb; - int32_t nRef; - TSKEY minKey; - TSKEY maxKey; - int64_t minVersion; - int64_t maxVersion; - int64_t nRow; - int64_t nDel; - SArray *aTbData; // SArray + SRWLatch latch; + STsdb *pTsdb; + SVBufPool *pPool; + volatile int32_t nRef; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; + int64_t nRow; + int64_t nDel; + SArray *aTbData; // SArray }; -int tsdbLockRepo(STsdb *pTsdb); -int tsdbUnlockRepo(STsdb *pTsdb); - struct TSDBROW { int8_t type; // 0 for row from tsRow, 1 for row from block data union { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index cb25e93cde..984b34814d 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -62,12 +62,13 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; - int64_t nRef; - int64_t size; - uint8_t* ptr; - SVBufPoolNode* pTail; - SVBufPoolNode node; + SVBufPool* next; + SVnode* pVnode; + volatile int32_t nRef; + int64_t size; + uint8_t* ptr; + SVBufPoolNode* pTail; + SVBufPoolNode node; }; int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); @@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg); +int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb403f79a7..d724bcd3be 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr; // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); +void vnodeBufPoolRef(SVBufPool* pPool); +void vnodeBufPoolUnRef(SVBufPool* pPool); // meta typedef struct SMCtbCursor SMCtbCursor; @@ -247,26 +249,26 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - STfs* pTfs; - SMsgCb msgCb; - SVBufPool* pPool; - SVBufPool* inUse; - SVBufPool* onCommit; - SVBufPool* onRecycle; - SMeta* pMeta; - SSma* pSma; - STsdb* pTsdb; - SWal* pWal; - STQ* pTq; - SSink* pSink; - tsem_t canCommit; - int64_t sync; - int32_t blockCount; - tsem_t syncSem; - SQHandle* pQuery; + char* path; + SVnodeCfg config; + SVState state; + STfs* pTfs; + SMsgCb msgCb; + TdThreadMutex mutex; + TdThreadCond poolNotEmpty; + SVBufPool* pPool; + SVBufPool* inUse; + SMeta* pMeta; + SSma* pSma; + STsdb* pTsdb; + SWal* pWal; + STQ* pTq; + SSink* pSink; + tsem_t canCommit; + int64_t sync; + int32_t blockCount; + tsem_t syncSem; + SQHandle* pQuery; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 723caca5d7..484020e6e1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { switch (state->state) { case SFSNEXTROW_FS: - state->aDFileSet = state->pTsdb->fs->cState->aDFileSet; + state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; state->nFileSet = taosArrayGetSize(state->aDFileSet); state->iFileSet = state->nFileSet; @@ -793,6 +793,9 @@ typedef struct { TSDBROW memRow, imemRow, fsRow; TsdbNextRowState input[3]; + SMemTable *pMemTable; + SMemTable *pIMemTable; + STsdb *pTsdb; } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) { @@ -800,21 +803,25 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + tsdbTakeMemSnapshot(pTsdb, &pIter->pMemTable, &pIter->pIMemTable); + STbData *pMem = NULL; - if (pTsdb->mem) { - tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + if (pIter->pMemTable) { + tsdbGetTbDataFromMemTable(pIter->pMemTable, suid, uid, &pMem); } STbData *pIMem = NULL; - if (pTsdb->imem) { - tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + if (pIter->pIMemTable) { + tsdbGetTbDataFromMemTable(pIter->pIMemTable, suid, uid, &pIMem); } + pIter->pTsdb = pTsdb; + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -878,6 +885,8 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } + tsdbUntakeMemSnapshot(pIter->pTsdb, pIter->pMemTable, pIter->pIMemTable); + return code; _err: return code; @@ -1189,7 +1198,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -1377,7 +1386,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 470bff1eae..13f310ae27 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -64,9 +64,26 @@ int32_t tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return code; - code = tsdbMemTableCreate(pTsdb, &pTsdb->mem); + SMemTable *pMemTable; + code = tsdbMemTableCreate(pTsdb, &pMemTable); if (code) goto _err; + // lock + code = taosThreadRwlockWrlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + pTsdb->mem = pMemTable; + + // unlock + code = taosThreadRwlockUnlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + return code; _err: @@ -83,9 +100,11 @@ int32_t tsdbCommit(STsdb *pTsdb) { // check if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { - // TODO: lock? + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->mem = NULL; - tsdbMemTableDestroy(pMemTable); + taosThreadRwlockUnlock(&pTsdb->rwLock); + + tsdbUnrefMemTable(pMemTable); goto _exit; } @@ -139,7 +158,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { goto _err; } - SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile; + SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile; if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); if (code) goto _err; @@ -228,7 +247,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); if (code) goto _err; - code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel); + code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel); if (code) goto _err; code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); @@ -263,7 +282,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->aBlockIdx); tMapDataReset(&pCommitter->oBlockMap); tBlockDataReset(&pCommitter->oBlockData); - pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ); + pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; @@ -836,7 +855,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { if (code) goto _err; // upsert SDFileSet - code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter)); + code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter)); if (code) goto _err; // close and sync @@ -941,10 +960,10 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { memset(pCommitter, 0, sizeof(*pCommitter)); ASSERT(pTsdb->mem && pTsdb->imem == NULL); - // lock(); + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; - // unlock(); + taosThreadRwlockUnlock(&pTsdb->rwLock); pCommitter->pTsdb = pTsdb; pCommitter->commitID = pTsdb->pVnode->state.commitID; @@ -954,7 +973,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; return code; @@ -1135,13 +1154,16 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { SMemTable *pMemTable = pTsdb->imem; if (eno == 0) { - code = tsdbFSCommit(pTsdb->fs); + code = tsdbFSCommit(pTsdb->pFS); } else { - code = tsdbFSRollback(pTsdb->fs); + code = tsdbFSRollback(pTsdb->pFS); } - tsdbMemTableDestroy(pMemTable); + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->imem = NULL; + taosThreadRwlockUnlock(&pTsdb->rwLock); + + tsdbUnrefMemTable(pMemTable); tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 82de931872..ee8a23e76e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -41,6 +41,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { } taosInitRWLatch(&pMemTable->latch); pMemTable->pTsdb = pTsdb; + pMemTable->pPool = pTsdb->pVnode->inUse; pMemTable->nRef = 1; pMemTable->minKey = TSKEY_MAX; pMemTable->maxKey = TSKEY_MIN; @@ -54,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } + vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; return code; @@ -65,6 +67,7 @@ _err: void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { + vnodeBufPoolUnRef(pMemTable->pPool); taosArrayDestroy(pMemTable->aTbData); taosMemoryFree(pMemTable); } @@ -590,3 +593,58 @@ _err: } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } + +void tsdbRefMemTable(SMemTable *pMemTable) { + int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); + ASSERT(nRef > 0); +} + +void tsdbUnrefMemTable(SMemTable *pMemTable) { + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); + if (nRef == 0) { + tsdbMemTableDestroy(pMemTable); + } +} + +int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) { + int32_t code = 0; + + // lock + code = taosThreadRwlockRdlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _exit; + } + + // take snapshot + *ppMem = pTsdb->mem; + *ppIMem = pTsdb->imem; + + if (*ppMem) { + tsdbRefMemTable(*ppMem); + } + + if (*ppIMem) { + tsdbRefMemTable(*ppIMem); + } + + // unlock + code = taosThreadRwlockUnlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _exit; + } + +_exit: + return code; +} + +void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) { + if (pMem) { + tsdbUnrefMemTable(pMem); + } + + if (pIMem) { + tsdbUnrefMemTable(pIMem); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 1fa582fae5..064c7adf4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -54,8 +54,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir); taosRealPath(pTsdb->path, NULL, slen); pTsdb->pVnode = pVnode; - pTsdb->repoLocked = false; - taosThreadMutexInit(&pTsdb->mutex, NULL); + taosThreadRwlockInit(&pTsdb->rwLock, NULL); if (!pKeepCfg) { tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg); } else { @@ -67,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee tfsMkdir(pVnode->pTfs, pTsdb->path); // open tsdb - if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) { + if (tsdbFSOpen(pTsdb, &pTsdb->pFS) < 0) { goto _err; } @@ -88,33 +87,10 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { - taosThreadMutexDestroy(&(*pTsdb)->mutex); - tsdbFSClose((*pTsdb)->fs); + taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + tsdbFSClose((*pTsdb)->pFS); tsdbCloseCache((*pTsdb)->lruCache); taosMemoryFreeClear(*pTsdb); } return 0; } - -int tsdbLockRepo(STsdb *pTsdb) { - int code = taosThreadMutexLock(&pTsdb->mutex); - if (code != 0) { - tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - pTsdb->repoLocked = true; - return 0; -} - -int tsdbUnlockRepo(STsdb *pTsdb) { - // ASSERT(IS_REPO_LOCKED(pTsdb)); - pTsdb->repoLocked = false; - int code = taosThreadMutexUnlock(&pTsdb->mutex); - if (code != 0) { - tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0b05bb7224..ad12ee60f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -118,6 +118,8 @@ struct STsdbReader { char* idStr; // query info handle, for debug purpose int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; + SMemTable* pMem; + SMemTable* pIMem; SIOCostSummary cost; STSchema* pSchema; @@ -1453,38 +1455,71 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer (pBlock->minVersion <= pVerRange->maxVer); } -static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) { - if (pBlockScanInfo->delSkyline == NULL) { - return false; - } - TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); - TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); - - // ts is not overlap - if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) { - return false; - } - - int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; - - // version is not overlap +static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); - for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) { + + for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) { if (p->version >= pBlock->minVersion) { return true; } - } else if (p->ts > pBlock->maxKey.ts) { + } else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts + if (p->version >= pBlock->minVersion) { + if (i < num - 1) { + TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); + if (i + 1 == num - 1) { // pnext is the last point + if (pnext->ts >= pBlock->minKey.ts) { + return true; + } + } else { + if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) { + return true; + } + } + } else { // it must be the last point + ASSERT(p->version == 0); + } + } + } else { // (p->ts > pBlock->maxKey.ts) { return false; } } - ASSERT(0); return false; } +static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) { + if (pBlockScanInfo->delSkyline == NULL) { + return false; + } + + // ts is not overlap + TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); + TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); + if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) { + return false; + } + + // version is not overlap + if (ASCENDING_TRAVERSE(order)) { + return doCheckforDatablockOverlap(pBlockScanInfo, pBlock); + } else { + int32_t index = pBlockScanInfo->fileDelIndex; + while(1) { + TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); + if (p->ts > pBlock->minKey.ts && index > 0) { + index -= 1; + } else { // find the first point that is smaller than the minKey.ts of dataBlock. + break; + } + } + + return doCheckforDatablockOverlap(pBlockScanInfo, pBlock); + } +} + // 1. the version of all rows should be less than the endVersion // 2. current block should not overlap with next neighbor block // 3. current timestamp should not be overlap with each other @@ -1847,8 +1882,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); STbData* d = NULL; - if (pReader->pTsdb->mem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + if (pReader->pMem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -1868,8 +1903,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea } STbData* di = NULL; - if (pReader->pTsdb->imem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + if (pReader->pIMem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -1905,7 +1940,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); - SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader* pDelFReader = NULL; code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); @@ -2795,7 +2830,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl SDataBlockIter* pBlockIter = &pReader->status.blockIter; - STsdbFSState* pFState = pReader->pTsdb->fs->cState; + STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); @@ -2809,6 +2844,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } } + tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem); + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -2824,6 +2861,8 @@ void tsdbReaderClose(STsdbReader* pReader) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem); + taosMemoryFreeClear(pSupInfo->plist); taosMemoryFree(pSupInfo->colIds); @@ -3042,7 +3081,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDataFReaderClose(&pReader->pFileReader); - STsdbFSState* pFState = pReader->pTsdb->fs->cState; + STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockScanInfo(pReader->status.pTableMap); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2ae646a571..137ef9a4a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -20,10 +20,10 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t STsdbFSState *pState; if (try) { - pState = pTsdb->fs->cState; + pState = pTsdb->pFS->cState; *canDo = 0; } else { - pState = pTsdb->fs->nState; + pState = pTsdb->pFS->nState; } for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) { @@ -83,7 +83,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (!canDo) goto _exit; // begin - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; // do retention @@ -91,7 +91,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (code) goto _err; // commit - code = tsdbFSCommit(pTsdb->fs); + code = tsdbFSCommit(pTsdb->pFS); if (code) goto _err; _exit: @@ -99,6 +99,6 @@ _exit: _err: tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - tsdbFSRollback(pTsdb->fs); + tsdbFSRollback(pTsdb->pFS); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 4886d874a9..fea0254045 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -45,7 +45,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { while (true) { if (pReader->pDataFReader == NULL) { - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT); + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->cState, pReader->fid, TD_GT); if (pSet == NULL) goto _exit; @@ -159,7 +159,7 @@ _err: static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; - SDelFile* pDelFile = pTsdb->fs->cState->pDelFile; + SDelFile* pDelFile = pTsdb->pFS->cState->pDelFile; if (pReader->pDelFReader == NULL) { if (pDelFile == NULL) { @@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); if (code) goto _err; - code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); + code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); if (code) goto _err; code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); @@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 pWriter->fid = fid; // read - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, fid, TD_EQ); if (pSet) { code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; @@ -907,7 +907,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 STsdb* pTsdb = pWriter->pTsdb; if (pWriter->pDelFWriter == NULL) { - SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState); + SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->nState); // reader if (pDelFile) { @@ -1017,7 +1017,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); if (code) goto _err; - code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel); + code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pWriter->pDelFWriter->fDel); if (code) goto _err; code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); @@ -1096,7 +1096,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr goto _err; } - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; *ppWriter = pWriter; @@ -1113,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { STsdbSnapWriter* pWriter = *ppWriter; if (rollback) { - code = tsdbFSRollback(pWriter->pTsdb->fs); + code = tsdbFSRollback(pWriter->pTsdb->pFS); if (code) goto _err; } else { code = tsdbSnapWriteDataEnd(pWriter); @@ -1122,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; - code = tsdbFSCommit(pWriter->pTsdb->fs); + code = tsdbFSCommit(pWriter->pTsdb->pFS); if (code) goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 9ca4dd6efb..d1584f701e 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -17,7 +17,7 @@ /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool); +static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool); static int vnodeBufPoolDestroy(SVBufPool *pPool); int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { @@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { for (int i = 0; i < 3; i++) { // create pool - ret = vnodeBufPoolCreate(size, &pPool); + ret = vnodeBufPoolCreate(pVnode, size, &pPool); if (ret < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); @@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) { } // STATIC METHODS ------------------- -static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); @@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { } pPool->next = NULL; + pPool->pVnode = pVnode; pPool->nRef = 0; pPool->size = 0; pPool->ptr = pPool->node.data; @@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); taosMemoryFree(pPool); return 0; +} + +void vnodeBufPoolRef(SVBufPool *pPool) { + int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1); + ASSERT(nRef > 0); +} + +void vnodeBufPoolUnRef(SVBufPool *pPool) { + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + if (nRef == 0) { + SVnode *pVnode = pPool->pVnode; + + vnodeBufPoolReset(pPool); + + taosThreadMutexLock(&pVnode->mutex); + + pPool->next = pVnode->pPool; + pVnode->pPool = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + + taosThreadMutexUnlock(&pVnode->mutex); + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index c969861241..0c8a8a5fd1 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,7 +15,7 @@ #include "vnd.h" -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); @@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode); int vnodeBegin(SVnode *pVnode) { // alloc buffer pool - /* pthread_mutex_lock(); */ + taosThreadMutexLock(&pVnode->mutex); while (pVnode->pPool == NULL) { - /* pthread_cond_wait(); */ + taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); } pVnode->inUse = pVnode->pPool; + pVnode->inUse->nRef = 1; pVnode->pPool = pVnode->inUse->next; pVnode->inUse->next = NULL; - /* ref pVnode->inUse buffer pool */ - /* pthread_mutex_unlock(); */ + taosThreadMutexUnlock(&pVnode->mutex); pVnode->state.commitID++; // begin meta @@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - pVnode->onCommit = pVnode->inUse; + vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; // save info @@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) { // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vnodeBufPoolReset(pVnode->onCommit); - pVnode->onCommit->next = pVnode->pPool; - pVnode->pPool = pVnode->onCommit; - pVnode->onCommit = NULL; vInfo("vgId:%d, commit over", TD_VID(pVnode)); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0914827950..fb2ac722a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); + taosThreadMutexInit(&pVnode->mutex, NULL); + taosThreadCondInit(&pVnode->poolNotEmpty, NULL); // open buffer pool if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { @@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) { // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); + taosThreadCondDestroy(&pVnode->poolNotEmpty); + taosThreadMutexDestroy(&pVnode->mutex); taosMemoryFree(pVnode); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 57ba08e8fa..dc44de9cf3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1196,7 +1196,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (pResCol->info.colId == pColMatchInfo->colId) { SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); - // taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); colExists = true; break; } @@ -1435,6 +1434,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } } + qDebug("scan rows: %d", pBlockInfo->rows); return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; @@ -1507,8 +1507,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } - SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; - + SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; pInfo->pTagCond = pTagCond; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9d40b15354..25648fdea4 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5098,12 +5098,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { return true; } -static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) { - // ignore null elements - if (isNull) { - return; - } - +static void doModeAdd(SModeInfo* pInfo, char* data) { int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { @@ -5128,10 +5123,16 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + int32_t numOfElems = 0; int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { char* data = colDataGetData(pInputCol, i); - doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i)); + if (colDataIsNull_s(pInputCol, i)) { + continue; + } + + numOfElems++; + doModeAdd(pInfo, data); if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { taosHashCleanup(pInfo->pHash); @@ -5139,7 +5140,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { } } - SET_VAL(pResInfo, 1, 1); + SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1b228925b1..90353ef114 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -56,6 +56,7 @@ # unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim # unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim # unsupport ./test.sh -f tsim/dnode/vnode_clean.sim +./test.sh -f tsim/dnode/use_dropped_dnode.sim # ---- import ./test.sh -f tsim/import/basic.sim diff --git a/tests/script/tsim/dnode/use_dropped_dnode.sim b/tests/script/tsim/dnode/use_dropped_dnode.sim new file mode 100644 index 0000000000..9c546e295f --- /dev/null +++ b/tests/script/tsim/dnode/use_dropped_dnode.sim @@ -0,0 +1,133 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +sql connect + +print =============== step1 create dnode2 +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 + +$x = 0 +step1: + $ = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not online! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +if $rows != 3 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi +if $data(3)[4] != ready then + goto step1 +endi + +print =============== step2 drop dnode 3 +sql drop dnode 3 +sql create dnode $hostname port 7300 + +$x = 0 +step2: + $ = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not online! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 3 then + return -1 +endi +if $data(1)[4] != ready then + goto step2 +endi +if $data(2)[4] != ready then + goto step2 +endi +if $data(3)[4] != null then + goto step2 +endi +if $data(4)[4] != offline then + goto step2 +endi + +print =============== step3: create dnode 4 +sleep 1000 +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/deploy.sh -n dnode3 -i 3 +system sh/exec.sh -n dnode3 -s start + +$x = 0 +step3: + $ = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not online! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 3 then + return -1 +endi +if $data(1)[4] != ready then + goto step3 +endi +if $data(2)[4] != ready then + goto step3 +endi +if $data(3)[4] != null then + goto step3 +endi +if $data(4)[4] != ready then + goto step3 +endi + +print =============== step4: create mnode 4 +sql create mnode on dnode 4 + +$x = 0 +step4: + $ = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not online! + return -1 + endi +sql show mnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[2] != leader then + goto step4 +endi +if $data(4)[2] != follower then + goto step4 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT diff --git a/tests/system-test/7-tmq/stbTagFilter.py b/tests/system-test/7-tmq/stbTagFilter.py deleted file mode 100644 index 1bb3d24bde..0000000000 --- a/tests/system-test/7-tmq/stbTagFilter.py +++ /dev/null @@ -1,260 +0,0 @@ - -import taos -import sys -import time -import socket -import os -import threading -from enum import Enum - -from util.log import * -from util.sql import * -from util.cases import * -from util.dnodes import * -sys.path.append("./7-tmq") -from tmqCommon import * - -class TDTestCase: - def __init__(self): - self.snapshot = 0 - self.vgroups = 4 - self.ctbNum = 1 - self.rowsPerTbl = 10000 - - def init(self, conn, logSql): - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), False) - - def prepareTestEnv(self): - tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 100000, - 'batchNum': 1200, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") - # tdSql.query("flush database %s"%(paraDict['dbName'])) - return - - def tmqCase1(self): - tdLog.printNoPrefix("======== test case 1: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 100000, - 'batchNum': 3000, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 5, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - paraDict['snapshot'] = self.snapshot - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - # update to half tables - # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) - # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' - queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) - # queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicFromStb1, queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - # paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 0 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) - topicList = topicFromStb1 - ifcheckdata = 1 - ifManualCommit = 1 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ - auto.offset.reset:earliest' - tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - - tdLog.info("insert process end, and start to check consume result") - expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - tdLog.info("run select sql from db") - tdSql.query(queryString) - expectrowcnt = tdSql.getRows() - - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if totalConsumeRows != expectrowcnt: - tdLog.exit("tmq consume rows error!") - - tmqCom.checkFileContent(consumerId, queryString) - - tdSql.query("drop topic %s"%topicFromStb1) - tdLog.printNoPrefix("======== test case 1 end ...... ") - - def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 10000, - 'batchNum': 5000, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 5, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - - paraDict['snapshot'] = self.snapshot - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") - tdSql.query("flush database %s"%(paraDict['dbName'])) - - # update to half tables - paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) - paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) - tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - tmqCom.initConsumerTable() - tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' - queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicFromStb1, queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - # paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 1 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) - topicList = topicFromStb1 - ifcheckdata = 1 - ifManualCommit = 1 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ - auto.offset.reset:earliest' - tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - - tdLog.info("insert process end, and start to check consume result") - expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() - - tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - - if totalConsumeRows != expectrowcnt: - tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) - - tdSql.query("drop topic %s"%topicFromStb1) - - tdLog.printNoPrefix("======== test case 2 end ...... ") - - def run(self): - tdSql.prepare() - self.prepareTestEnv() - tdLog.printNoPrefix("=============================================") - tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") - self.tmqCase1() - # self.tmqCase2() - - # self.prepareTestEnv() - # tdLog.printNoPrefix("====================================================================") - # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") - # self.snapshot = 1 - # self.tmqCase1() - # self.tmqCase2() - - - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") - -event = threading.Event() - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase())