Merge branch '3.0' into test3.0/lihui

This commit is contained in:
plum-lihui 2022-07-19 17:35:30 +08:00
commit 0d20f04f3d
20 changed files with 433 additions and 434 deletions

View File

@ -1217,6 +1217,9 @@ static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *c
kv->value = value;
kv->length = valueLen;
if (isTag) {
if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
kv->type = TSDB_DATA_TYPE_NCHAR;
} else {
int32_t ret = smlParseValue(kv, msg);

View File

@ -153,23 +153,22 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return terrno;
}
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
if (insertFp != NULL) {
code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) {
code = terrno;
taosThreadRwlockWrlock(pLock);
taosHashRemove(hash, pRow->pObj, keySize);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = code;
taosThreadRwlockUnlock(pLock);
return terrno;
}
}
taosThreadRwlockUnlock(pLock);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
@ -194,7 +193,6 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update");
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
@ -202,6 +200,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
}
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pNewRow, false);
pSdb->tableVer[pOldRow->type]++;

View File

@ -174,6 +174,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
void tsdbRefMemTable(SMemTable *pMemTable);
void tsdbUnrefMemTable(SMemTable *pMemTable);
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem);
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem);
// STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
@ -273,23 +277,14 @@ typedef struct {
} SRtn;
struct STsdb {
char *path;
SVnode *pVnode;
TdThreadMutex mutex;
bool repoLocked;
STsdbKeepCfg keepCfg;
SMemTable *mem;
SMemTable *imem;
SRtn rtn;
STsdbFS *fs;
SLRUCache *lruCache;
};
struct STable {
uint64_t suid;
uint64_t uid;
STSchema *pSchema; // latest schema
STSchema *pCacheSchema; // cached cache
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
SMemTable *mem;
SMemTable *imem;
STsdbFS *pFS;
SLRUCache *lruCache;
};
struct TSDBKEY {
@ -330,21 +325,19 @@ struct STbData {
};
struct SMemTable {
SRWLatch latch;
STsdb *pTsdb;
int32_t nRef;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t nRow;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
SRWLatch latch;
STsdb *pTsdb;
SVBufPool *pPool;
volatile int32_t nRef;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t nRow;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
};
int tsdbLockRepo(STsdb *pTsdb);
int tsdbUnlockRepo(STsdb *pTsdb);
struct TSDBROW {
int8_t type; // 0 for row from tsRow, 1 for row from block data
union {

View File

@ -62,12 +62,13 @@ struct SVBufPoolNode {
};
struct SVBufPool {
SVBufPool* next;
int64_t nRef;
int64_t size;
uint8_t* ptr;
SVBufPoolNode* pTail;
SVBufPoolNode node;
SVBufPool* next;
SVnode* pVnode;
volatile int32_t nRef;
int64_t size;
uint8_t* ptr;
SVBufPoolNode* pTail;
SVBufPoolNode node;
};
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);

View File

@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr;
// vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool);
// meta
typedef struct SMCtbCursor SMCtbCursor;
@ -247,26 +249,26 @@ struct STsdbKeepCfg {
};
struct SVnode {
char* path;
SVnodeCfg config;
SVState state;
STfs* pTfs;
SMsgCb msgCb;
SVBufPool* pPool;
SVBufPool* inUse;
SVBufPool* onCommit;
SVBufPool* onRecycle;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t sync;
int32_t blockCount;
tsem_t syncSem;
SQHandle* pQuery;
char* path;
SVnodeCfg config;
SVState state;
STfs* pTfs;
SMsgCb msgCb;
TdThreadMutex mutex;
TdThreadCond poolNotEmpty;
SVBufPool* pPool;
SVBufPool* inUse;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t sync;
int32_t blockCount;
tsem_t syncSem;
SQHandle* pQuery;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)

View File

@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
switch (state->state) {
case SFSNEXTROW_FS:
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet;
@ -793,6 +793,9 @@ typedef struct {
TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3];
SMemTable *pMemTable;
SMemTable *pIMemTable;
STsdb *pTsdb;
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
@ -800,21 +803,25 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
tsdbTakeMemSnapshot(pTsdb, &pIter->pMemTable, &pIter->pIMemTable);
STbData *pMem = NULL;
if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
if (pIter->pMemTable) {
tsdbGetTbDataFromMemTable(pIter->pMemTable, suid, uid, &pMem);
}
STbData *pIMem = NULL;
if (pTsdb->imem) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
if (pIter->pIMemTable) {
tsdbGetTbDataFromMemTable(pIter->pIMemTable, suid, uid, &pIMem);
}
pIter->pTsdb = pTsdb;
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) {
SDelFReader *pDelFReader;
@ -878,6 +885,8 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy(pIter->pSkyline);
}
tsdbUntakeMemSnapshot(pIter->pTsdb, pIter->pMemTable, pIter->pIMemTable);
return code;
_err:
return code;
@ -1189,7 +1198,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) {
SDelFReader *pDelFReader;
@ -1377,7 +1386,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) {
SDelFReader *pDelFReader;

View File

@ -64,9 +64,26 @@ int32_t tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return code;
code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
SMemTable *pMemTable;
code = tsdbMemTableCreate(pTsdb, &pMemTable);
if (code) goto _err;
// lock
code = taosThreadRwlockWrlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pTsdb->mem = pMemTable;
// unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
return code;
_err:
@ -83,9 +100,11 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
// TODO: lock?
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->mem = NULL;
tsdbMemTableDestroy(pMemTable);
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable);
goto _exit;
}
@ -139,7 +158,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
goto _err;
}
SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile;
if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
if (code) goto _err;
@ -228,7 +247,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
if (code) goto _err;
code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel);
if (code) goto _err;
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
@ -263,7 +282,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
@ -836,7 +855,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if (code) goto _err;
// upsert SDFileSet
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
if (code) goto _err;
// close and sync
@ -941,10 +960,10 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
memset(pCommitter, 0, sizeof(*pCommitter));
ASSERT(pTsdb->mem && pTsdb->imem == NULL);
// lock();
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
// unlock();
taosThreadRwlockUnlock(&pTsdb->rwLock);
pCommitter->pTsdb = pTsdb;
pCommitter->commitID = pTsdb->pVnode->state.commitID;
@ -954,7 +973,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
code = tsdbFSBegin(pTsdb->fs);
code = tsdbFSBegin(pTsdb->pFS);
if (code) goto _err;
return code;
@ -1135,13 +1154,16 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
SMemTable *pMemTable = pTsdb->imem;
if (eno == 0) {
code = tsdbFSCommit(pTsdb->fs);
code = tsdbFSCommit(pTsdb->pFS);
} else {
code = tsdbFSRollback(pTsdb->fs);
code = tsdbFSRollback(pTsdb->pFS);
}
tsdbMemTableDestroy(pMemTable);
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable);
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
return code;

View File

@ -41,6 +41,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
}
taosInitRWLatch(&pMemTable->latch);
pMemTable->pTsdb = pTsdb;
pMemTable->pPool = pTsdb->pVnode->inUse;
pMemTable->nRef = 1;
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
@ -54,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree(pMemTable);
goto _err;
}
vnodeBufPoolRef(pMemTable->pPool);
*ppMemTable = pMemTable;
return code;
@ -65,6 +67,7 @@ _err:
void tsdbMemTableDestroy(SMemTable *pMemTable) {
if (pMemTable) {
vnodeBufPoolUnRef(pMemTable->pPool);
taosArrayDestroy(pMemTable->aTbData);
taosMemoryFree(pMemTable);
}
@ -590,3 +593,58 @@ _err:
}
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
void tsdbRefMemTable(SMemTable *pMemTable) {
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0);
}
void tsdbUnrefMemTable(SMemTable *pMemTable) {
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
if (nRef == 0) {
tsdbMemTableDestroy(pMemTable);
}
}
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) {
int32_t code = 0;
// lock
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
// take snapshot
*ppMem = pTsdb->mem;
*ppIMem = pTsdb->imem;
if (*ppMem) {
tsdbRefMemTable(*ppMem);
}
if (*ppIMem) {
tsdbRefMemTable(*ppIMem);
}
// unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
_exit:
return code;
}
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) {
if (pMem) {
tsdbUnrefMemTable(pMem);
}
if (pIMem) {
tsdbUnrefMemTable(pIMem);
}
}

View File

@ -54,8 +54,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode;
pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL);
taosThreadRwlockInit(&pTsdb->rwLock, NULL);
if (!pKeepCfg) {
tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg);
} else {
@ -67,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
tfsMkdir(pVnode->pTfs, pTsdb->path);
// open tsdb
if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) {
if (tsdbFSOpen(pTsdb, &pTsdb->pFS) < 0) {
goto _err;
}
@ -88,33 +87,10 @@ _err:
int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
taosThreadMutexDestroy(&(*pTsdb)->mutex);
tsdbFSClose((*pTsdb)->fs);
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbFSClose((*pTsdb)->pFS);
tsdbCloseCache((*pTsdb)->lruCache);
taosMemoryFreeClear(*pTsdb);
}
return 0;
}
int tsdbLockRepo(STsdb *pTsdb) {
int code = taosThreadMutexLock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pTsdb->repoLocked = true;
return 0;
}
int tsdbUnlockRepo(STsdb *pTsdb) {
// ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb->repoLocked = false;
int code = taosThreadMutexUnlock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}

View File

@ -118,6 +118,8 @@ struct STsdbReader {
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
SMemTable* pMem;
SMemTable* pIMem;
SIOCostSummary cost;
STSchema* pSchema;
@ -1453,38 +1455,71 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
(pBlock->minVersion <= pVerRange->maxVer);
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
// ts is not overlap
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
// version is not overlap
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVersion) {
return true;
}
} else if (p->ts > pBlock->maxKey.ts) {
} else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts
if (p->version >= pBlock->minVersion) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (i + 1 == num - 1) { // pnext is the last point
if (pnext->ts >= pBlock->minKey.ts) {
return true;
}
} else {
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
return true;
}
}
} else { // it must be the last point
ASSERT(p->version == 0);
}
}
} else { // (p->ts > pBlock->maxKey.ts) {
return false;
}
}
ASSERT(0);
return false;
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while(1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pBlock->minKey.ts && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
break;
}
}
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
}
}
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
@ -1847,8 +1882,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
if (pReader->pMem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) {
@ -1868,8 +1903,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
if (pReader->pIMem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
if (code == TSDB_CODE_SUCCESS) {
@ -1905,7 +1940,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) {
SDelFReader* pDelFReader = NULL;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
@ -2795,7 +2830,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
@ -2809,6 +2844,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
}
tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem);
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
@ -2824,6 +2861,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem);
taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds);
@ -3042,7 +3081,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose(&pReader->pFileReader);
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
resetDataBlockScanInfo(pReader->status.pTableMap);

View File

@ -20,10 +20,10 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t
STsdbFSState *pState;
if (try) {
pState = pTsdb->fs->cState;
pState = pTsdb->pFS->cState;
*canDo = 0;
} else {
pState = pTsdb->fs->nState;
pState = pTsdb->pFS->nState;
}
for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) {
@ -83,7 +83,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (!canDo) goto _exit;
// begin
code = tsdbFSBegin(pTsdb->fs);
code = tsdbFSBegin(pTsdb->pFS);
if (code) goto _err;
// do retention
@ -91,7 +91,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (code) goto _err;
// commit
code = tsdbFSCommit(pTsdb->fs);
code = tsdbFSCommit(pTsdb->pFS);
if (code) goto _err;
_exit:
@ -99,6 +99,6 @@ _exit:
_err:
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbFSRollback(pTsdb->fs);
tsdbFSRollback(pTsdb->pFS);
return code;
}

View File

@ -45,7 +45,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
while (true) {
if (pReader->pDataFReader == NULL) {
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT);
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->cState, pReader->fid, TD_GT);
if (pSet == NULL) goto _exit;
@ -159,7 +159,7 @@ _err:
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
SDelFile* pDelFile = pTsdb->pFS->cState->pDelFile;
if (pReader->pDelFReader == NULL) {
if (pDelFile == NULL) {
@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
if (code) goto _err;
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
if (code) goto _err;
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter->fid = fid;
// read
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, fid, TD_EQ);
if (pSet) {
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
@ -907,7 +907,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) {
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState);
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->nState);
// reader
if (pDelFile) {
@ -1017,7 +1017,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err;
code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel);
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pWriter->pDelFWriter->fDel);
if (code) goto _err;
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
@ -1096,7 +1096,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
goto _err;
}
code = tsdbFSBegin(pTsdb->fs);
code = tsdbFSBegin(pTsdb->pFS);
if (code) goto _err;
*ppWriter = pWriter;
@ -1113,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
STsdbSnapWriter* pWriter = *ppWriter;
if (rollback) {
code = tsdbFSRollback(pWriter->pTsdb->fs);
code = tsdbFSRollback(pWriter->pTsdb->pFS);
if (code) goto _err;
} else {
code = tsdbSnapWriteDataEnd(pWriter);
@ -1122,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err;
code = tsdbFSCommit(pWriter->pTsdb->fs);
code = tsdbFSCommit(pWriter->pTsdb->pFS);
if (code) goto _err;
}

View File

@ -17,7 +17,7 @@
/* ------------------------ STRUCTURES ------------------------ */
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool);
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool);
static int vnodeBufPoolDestroy(SVBufPool *pPool);
int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
for (int i = 0; i < 3; i++) {
// create pool
ret = vnodeBufPoolCreate(size, &pPool);
ret = vnodeBufPoolCreate(pVnode, size, &pPool);
if (ret < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode);
@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
}
// STATIC METHODS -------------------
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) {
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) {
}
pPool->next = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0;
pPool->size = 0;
pPool->ptr = pPool->node.data;
@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool);
taosMemoryFree(pPool);
return 0;
}
void vnodeBufPoolRef(SVBufPool *pPool) {
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
ASSERT(nRef > 0);
}
void vnodeBufPoolUnRef(SVBufPool *pPool) {
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
if (nRef == 0) {
SVnode *pVnode = pPool->pVnode;
vnodeBufPoolReset(pPool);
taosThreadMutexLock(&pVnode->mutex);
pPool->next = pVnode->pPool;
pVnode->pPool = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
taosThreadMutexUnlock(&pVnode->mutex);
}
}

View File

@ -15,7 +15,7 @@
#include "vnd.h"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode);
int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool
/* pthread_mutex_lock(); */
taosThreadMutexLock(&pVnode->mutex);
while (pVnode->pPool == NULL) {
/* pthread_cond_wait(); */
taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
}
pVnode->inUse = pVnode->pPool;
pVnode->inUse->nRef = 1;
pVnode->pPool = pVnode->inUse->next;
pVnode->inUse->next = NULL;
/* ref pVnode->inUse buffer pool */
/* pthread_mutex_unlock(); */
taosThreadMutexUnlock(&pVnode->mutex);
pVnode->state.commitID++;
// begin meta
@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
pVnode->onCommit = pVnode->inUse;
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
// save info
@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) {
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit;
pVnode->onCommit = NULL;
vInfo("vgId:%d, commit over", TD_VID(pVnode));

View File

@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1);
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
// open buffer pool
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) {
// destroy handle
tsem_destroy(&(pVnode->canCommit));
tsem_destroy(&pVnode->syncSem);
taosThreadCondDestroy(&pVnode->poolNotEmpty);
taosThreadMutexDestroy(&pVnode->mutex);
taosMemoryFree(pVnode);
}
}

View File

@ -1196,7 +1196,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if (pResCol->info.colId == pColMatchInfo->colId) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
@ -1435,6 +1434,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
}
qDebug("scan rows: %d", pBlockInfo->rows);
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
@ -1507,8 +1507,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto _error;
}
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
pInfo->pTagCond = pTagCond;

View File

@ -5098,12 +5098,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
return true;
}
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
// ignore null elements
if (isNull) {
return;
}
static void doModeAdd(SModeInfo* pInfo, char* data) {
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
@ -5128,10 +5123,16 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t numOfElems = 0;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
if (colDataIsNull_s(pInputCol, i)) {
continue;
}
numOfElems++;
doModeAdd(pInfo, data);
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
@ -5139,7 +5140,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
}
}
SET_VAL(pResInfo, 1, 1);
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}

View File

@ -56,6 +56,7 @@
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
# unsupport ./test.sh -f tsim/dnode/vnode_clean.sim
./test.sh -f tsim/dnode/use_dropped_dnode.sim
# ---- import
./test.sh -f tsim/import/basic.sim

View File

@ -0,0 +1,133 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sql connect
print =============== step1 create dnode2
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
$x = 0
step1:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
if $rows != 3 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
print =============== step2 drop dnode 3
sql drop dnode 3
sql create dnode $hostname port 7300
$x = 0
step2:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 3 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
if $data(3)[4] != null then
goto step2
endi
if $data(4)[4] != offline then
goto step2
endi
print =============== step3: create dnode 4
sleep 1000
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/deploy.sh -n dnode3 -i 3
system sh/exec.sh -n dnode3 -s start
$x = 0
step3:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 3 then
return -1
endi
if $data(1)[4] != ready then
goto step3
endi
if $data(2)[4] != ready then
goto step3
endi
if $data(3)[4] != null then
goto step3
endi
if $data(4)[4] != ready then
goto step3
endi
print =============== step4: create mnode 4
sql create mnode on dnode 4
$x = 0
step4:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql show mnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[2] != leader then
goto step4
endi
if $data(4)[2] != follower then
goto step4
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT

View File

@ -1,260 +0,0 @@
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 1200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
# queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("run select sql from db")
tdSql.query(queryString)
expectrowcnt = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
# self.tmqCase2()
# self.prepareTestEnv()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
# self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())