Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs
This commit is contained in:
commit
dc15fe4f78
|
@ -133,13 +133,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
|||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
|
||||
|
||||
/**
|
||||
* kill the ongoing query and free the query handle and corresponding resources automatically
|
||||
* @param tinfo qhandle
|
||||
* @return
|
||||
*/
|
||||
int32_t qKillTask(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* kill the ongoing query asynchronously
|
||||
* @param tinfo qhandle
|
||||
|
|
|
@ -38,7 +38,6 @@ extern "C" {
|
|||
#define TARRAY_MIN_SIZE 8
|
||||
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
|
||||
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
|
||||
#define TARRAY_GET_START(array) ((array)->pData)
|
||||
|
||||
typedef struct SArray {
|
||||
size_t size;
|
||||
|
@ -71,14 +70,6 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
|
|||
*/
|
||||
void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pArray
|
||||
* @param pData position array list
|
||||
* @param numOfElems the number of removed position
|
||||
*/
|
||||
void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfElems);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pArray
|
||||
|
@ -266,13 +257,6 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
|
|||
*/
|
||||
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags);
|
||||
|
||||
/**
|
||||
* search the array
|
||||
* @param pArray
|
||||
* @param key
|
||||
*/
|
||||
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags);
|
||||
|
||||
/**
|
||||
* sort the pointer data in the array
|
||||
* @param pArray
|
||||
|
@ -286,8 +270,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
|||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
||||
|
||||
char* taosShowStrArray(const SArray* pArray);
|
||||
|
||||
/**
|
||||
* swap array
|
||||
* @param a
|
||||
|
@ -295,6 +277,7 @@ char* taosShowStrArray(const SArray* pArray);
|
|||
* @return
|
||||
*/
|
||||
void taosArraySwap(SArray* a, SArray* b);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -167,7 +167,8 @@ void *tsdbGetIdx(SMeta *pMeta);
|
|||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid,
|
||||
void **pReader);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
|
|
@ -298,29 +298,6 @@ int32_t tsdbMerge(STsdb *pTsdb);
|
|||
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
|
||||
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
|
||||
|
||||
// tsdbCache ==============================================================================================
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
||||
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
|
||||
|
||||
// tsdbDiskData ==============================================================================================
|
||||
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
|
||||
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
|
||||
|
@ -729,6 +706,45 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
|||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
|
||||
// tsdbCache ==============================================================================================
|
||||
typedef struct SCacheRowsReader {
|
||||
SVnode *pVnode;
|
||||
STSchema *pSchema;
|
||||
uint64_t uid;
|
||||
uint64_t suid;
|
||||
char **transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
SArray *pTableList; // table id list
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
STsdbReadSnap *pReadSnap;
|
||||
SDataFReader *pDataFReader;
|
||||
SDataFReader *pDataFReaderLast;
|
||||
} SCacheRowsReader;
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
||||
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
|
||||
|
||||
// ========== inline functions ==========
|
||||
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||
|
|
|
@ -26,7 +26,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
taosLRUCacheSetStrictCapacity(pCache, true);
|
||||
taosLRUCacheSetStrictCapacity(pCache, false);
|
||||
|
||||
taosThreadMutexInit(&pTsdb->lruMutex, NULL);
|
||||
|
||||
|
@ -488,11 +488,12 @@ typedef struct {
|
|||
int32_t nFileSet;
|
||||
int32_t iFileSet;
|
||||
SArray *aDFileSet;
|
||||
SDataFReader *pDataFReader;
|
||||
SDataFReader **pDataFReader;
|
||||
TSDBROW row;
|
||||
|
||||
SMergeTree mergeTree;
|
||||
SMergeTree *pMergeTree;
|
||||
SMergeTree mergeTree;
|
||||
SMergeTree *pMergeTree;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
} SFSLastNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||
|
@ -519,18 +520,20 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (state->pDataFReader != NULL) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
|
||||
if (*state->pDataFReader != NULL) {
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
|
||||
resetLastBlockLoadInfo(state->pLoadInfo);
|
||||
}
|
||||
|
||||
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||
if (code) goto _err;
|
||||
|
||||
SSttBlockLoadInfo *pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0);
|
||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL);
|
||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL);
|
||||
state->pMergeTree = &state->mergeTree;
|
||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
if (!hasVal) {
|
||||
|
@ -554,10 +557,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
|||
}
|
||||
|
||||
_err:
|
||||
if (state->pDataFReader) {
|
||||
/*if (state->pDataFReader) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
}
|
||||
}*/
|
||||
if (state->pMergeTree != NULL) {
|
||||
tMergeTreeClose(state->pMergeTree);
|
||||
state->pMergeTree = NULL;
|
||||
|
@ -575,12 +578,12 @@ int32_t clearNextRowFromFSLast(void *iter) {
|
|||
if (!state) {
|
||||
return code;
|
||||
}
|
||||
|
||||
/*
|
||||
if (state->pDataFReader) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
}
|
||||
|
||||
*/
|
||||
if (state->pMergeTree != NULL) {
|
||||
tMergeTreeClose(state->pMergeTree);
|
||||
state->pMergeTree = NULL;
|
||||
|
@ -597,27 +600,28 @@ typedef enum SFSNEXTROWSTATES {
|
|||
} SFSNEXTROWSTATES;
|
||||
|
||||
typedef struct SFSNextRowIter {
|
||||
SFSNEXTROWSTATES state; // [input]
|
||||
STsdb *pTsdb; // [input]
|
||||
SBlockIdx *pBlockIdxExp; // [input]
|
||||
STSchema *pTSchema; // [input]
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
int32_t nFileSet;
|
||||
int32_t iFileSet;
|
||||
SArray *aDFileSet;
|
||||
SDataFReader *pDataFReader;
|
||||
SArray *aBlockIdx;
|
||||
SBlockIdx *pBlockIdx;
|
||||
SMapData blockMap;
|
||||
int32_t nBlock;
|
||||
int32_t iBlock;
|
||||
SDataBlk block;
|
||||
SBlockData blockData;
|
||||
SBlockData *pBlockData;
|
||||
int32_t nRow;
|
||||
int32_t iRow;
|
||||
TSDBROW row;
|
||||
SFSNEXTROWSTATES state; // [input]
|
||||
STsdb *pTsdb; // [input]
|
||||
SBlockIdx *pBlockIdxExp; // [input]
|
||||
STSchema *pTSchema; // [input]
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
int32_t nFileSet;
|
||||
int32_t iFileSet;
|
||||
SArray *aDFileSet;
|
||||
SDataFReader **pDataFReader;
|
||||
SArray *aBlockIdx;
|
||||
SBlockIdx *pBlockIdx;
|
||||
SMapData blockMap;
|
||||
int32_t nBlock;
|
||||
int32_t iBlock;
|
||||
SDataBlk block;
|
||||
SBlockData blockData;
|
||||
SBlockData *pBlockData;
|
||||
int32_t nRow;
|
||||
int32_t iRow;
|
||||
TSDBROW row;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
} SFSNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||
|
@ -648,8 +652,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||
if (code) goto _err;
|
||||
if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
|
||||
if (*state->pDataFReader != NULL) {
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
|
||||
resetLastBlockLoadInfo(state->pLoadInfo);
|
||||
}
|
||||
|
||||
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// tMapDataReset(&state->blockIdxMap);
|
||||
if (!state->aBlockIdx) {
|
||||
|
@ -657,7 +669,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
} else {
|
||||
taosArrayClear(state->aBlockIdx);
|
||||
}
|
||||
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx);
|
||||
code = tsdbReadBlockIdx(*state->pDataFReader, state->aBlockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
/* if (state->pBlockIdx) { */
|
||||
|
@ -666,17 +678,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
* &state->blockIdx);
|
||||
*/
|
||||
state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
|
||||
if (!state->pBlockIdx) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
if (!state->pBlockIdx) { /*
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
*state->pDataFReader = NULL;
|
||||
resetLastBlockLoadInfo(state->pLoadInfo);*/
|
||||
goto _next_fileset;
|
||||
}
|
||||
|
||||
tMapDataReset(&state->blockMap);
|
||||
/*
|
||||
if (state->blockMap.pData != NULL) {
|
||||
tMapDataClear(&state->blockMap);
|
||||
}
|
||||
|
||||
code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
|
||||
*/
|
||||
code = tsdbReadDataBlk(*state->pDataFReader, state->pBlockIdx, &state->blockMap);
|
||||
if (code) goto _err;
|
||||
|
||||
state->nBlock = state->blockMap.nItem;
|
||||
|
@ -703,7 +718,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
|
||||
code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
|
||||
if (code) goto _err;
|
||||
|
||||
state->nRow = state->blockData.nRow;
|
||||
|
@ -719,8 +734,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
if (--state->iRow < 0) {
|
||||
state->state = SFSNEXTROW_BLOCKDATA;
|
||||
if (--state->iBlock < 0) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
*state->pDataFReader = NULL;
|
||||
resetLastBlockLoadInfo(state->pLoadInfo);
|
||||
|
||||
if (state->aBlockIdx) {
|
||||
taosArrayDestroy(state->aBlockIdx);
|
||||
|
@ -739,16 +755,17 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
}
|
||||
|
||||
_err:
|
||||
if (state->pDataFReader) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
}
|
||||
/*
|
||||
if (*state->pDataFReader) {
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
*state->pDataFReader = NULL;
|
||||
resetLastBlockLoadInfo(state->pLoadInfo);
|
||||
}*/
|
||||
if (state->aBlockIdx) {
|
||||
taosArrayDestroy(state->aBlockIdx);
|
||||
state->aBlockIdx = NULL;
|
||||
}
|
||||
if (state->pBlockData) {
|
||||
// tBlockDataDestroy(&state->blockData, 1);
|
||||
tBlockDataDestroy(state->pBlockData, 1);
|
||||
state->pBlockData = NULL;
|
||||
}
|
||||
|
@ -765,11 +782,11 @@ int32_t clearNextRowFromFS(void *iter) {
|
|||
if (!state) {
|
||||
return code;
|
||||
}
|
||||
|
||||
/*
|
||||
if (state->pDataFReader) {
|
||||
tsdbDataFReaderClose(&state->pDataFReader);
|
||||
state->pDataFReader = NULL;
|
||||
}
|
||||
}*/
|
||||
if (state->aBlockIdx) {
|
||||
taosArrayDestroy(state->aBlockIdx);
|
||||
state->aBlockIdx = NULL;
|
||||
|
@ -930,25 +947,22 @@ typedef struct {
|
|||
TSDBROW memRow, imemRow, fsLastRow, fsRow;
|
||||
|
||||
TsdbNextRowState input[4];
|
||||
STsdbReadSnap *pReadSnap;
|
||||
STsdb *pTsdb;
|
||||
} CacheNextRowIter;
|
||||
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) {
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
|
||||
SDataFReader **pDataFReaderLast) {
|
||||
int code = 0;
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
if (pIter->pReadSnap->pMem) {
|
||||
pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid);
|
||||
if (pReadSnap->pMem) {
|
||||
pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
|
||||
}
|
||||
|
||||
STbData *pIMem = NULL;
|
||||
if (pIter->pReadSnap->pIMem) {
|
||||
pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid);
|
||||
if (pReadSnap->pIMem) {
|
||||
pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
|
||||
}
|
||||
|
||||
pIter->pTsdb = pTsdb;
|
||||
|
@ -957,7 +971,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
SDelIdx delIdx;
|
||||
|
||||
SDelFile *pDelFile = pIter->pReadSnap->fs.pDelFile;
|
||||
SDelFile *pDelFile = pReadSnap->fs.pDelFile;
|
||||
if (pDelFile) {
|
||||
SDelFReader *pDelFReader;
|
||||
|
||||
|
@ -988,18 +1002,22 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
||||
pIter->fsLastState.pTsdb = pTsdb;
|
||||
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
||||
pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet;
|
||||
pIter->fsLastState.pTSchema = pTSchema;
|
||||
pIter->fsLastState.suid = suid;
|
||||
pIter->fsLastState.uid = uid;
|
||||
pIter->fsLastState.pLoadInfo = pLoadInfo;
|
||||
pIter->fsLastState.pDataFReader = pDataFReaderLast;
|
||||
|
||||
pIter->fsState.state = SFSNEXTROW_FS;
|
||||
pIter->fsState.pTsdb = pTsdb;
|
||||
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
||||
pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet;
|
||||
pIter->fsState.pBlockIdxExp = &pIter->idx;
|
||||
pIter->fsState.pTSchema = pTSchema;
|
||||
pIter->fsState.suid = suid;
|
||||
pIter->fsState.uid = uid;
|
||||
pIter->fsState.pLoadInfo = pLoadInfo;
|
||||
pIter->fsState.pDataFReader = pDataFReader;
|
||||
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
|
@ -1040,8 +1058,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
|||
taosArrayDestroy(pIter->pSkyline);
|
||||
}
|
||||
|
||||
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL);
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
@ -1119,10 +1135,10 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
|
@ -1133,7 +1149,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
|
@ -1233,20 +1250,20 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
}
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
// taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
// taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
|
@ -1257,7 +1274,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
|||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
|
@ -1350,18 +1368,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
|||
}
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
// taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
// taosMemoryFreeClear(pTSchema);
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
@ -1370,13 +1388,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
|||
getTableCacheKey(uid, 0, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
STsdb *pTsdb = pr->pVnode->pTsdb;
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
SArray *pArray = NULL;
|
||||
bool dup = false; // which is always false for now
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pArray);
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pArray == NULL) {
|
||||
if (!dup && pArray) {
|
||||
|
@ -1392,17 +1411,17 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
|||
|
||||
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
|
||||
_taos_lru_deleter_t deleter = deleteTableCacheLast;
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
// taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
} else {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
}
|
||||
// h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
} // else {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//}
|
||||
}
|
||||
|
||||
*handle = h;
|
||||
|
@ -1434,7 +1453,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
@ -1443,12 +1462,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
STsdb *pTsdb = pr->pVnode->pTsdb;
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
SArray *pLastArray = NULL;
|
||||
code = mergeLast(uid, pTsdb, &pLastArray);
|
||||
code = mergeLast(uid, pTsdb, &pLastArray, pr);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pLastArray == NULL) {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
@ -1460,17 +1480,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
|
||||
_taos_lru_deleter_t deleter = deleteTableCacheLast;
|
||||
LRUStatus status =
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
// taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
} else {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
}
|
||||
// h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
} // else {
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//}
|
||||
}
|
||||
|
||||
*handle = h;
|
||||
|
|
|
@ -18,18 +18,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
typedef struct SCacheRowsReader {
|
||||
SVnode* pVnode;
|
||||
STSchema* pSchema;
|
||||
uint64_t uid;
|
||||
char** transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
SArray* pTableList; // table id list
|
||||
} SCacheRowsReader;
|
||||
|
||||
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||
|
||||
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
||||
void** pRes) {
|
||||
|
@ -61,7 +50,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
|||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
|
||||
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
|
||||
} else {
|
||||
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
||||
|
@ -75,7 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
|||
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
}
|
||||
|
||||
pBlock->info.rows += allNullRow? 0:1;
|
||||
pBlock->info.rows += allNullRow ? 0 : 1;
|
||||
} else {
|
||||
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
|
||||
|
||||
|
@ -108,7 +97,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid,
|
||||
void** pReader) {
|
||||
*pReader = NULL;
|
||||
|
||||
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
||||
|
@ -119,6 +109,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
|
|||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->numOfCols = numOfCols;
|
||||
p->suid = suid;
|
||||
|
||||
if (taosArrayGetSize(pTableIdList) == 0) {
|
||||
*pReader = p;
|
||||
|
@ -145,6 +136,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
|
|||
}
|
||||
}
|
||||
|
||||
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0);
|
||||
if (p->pLoadInfo == NULL) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pReader = p;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -161,6 +158,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
taosMemoryFree(p->pSchema);
|
||||
}
|
||||
|
||||
destroyLastBlockLoadInfo(p->pLoadInfo);
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -171,9 +170,9 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
|||
*pRow = NULL;
|
||||
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
|
||||
code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
|
||||
} else {
|
||||
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
|
||||
code = tsdbCacheGetLastH(lruCache, uid, pr, h);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -189,7 +188,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
|||
}
|
||||
|
||||
static void freeItem(void* pItem) {
|
||||
SLastCol* pCol = (SLastCol*) pItem;
|
||||
SLastCol* pCol = (SLastCol*)pItem;
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
|
||||
taosMemoryFree(pCol->colVal.value.pData);
|
||||
}
|
||||
|
@ -230,7 +229,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
|
||||
struct STColumn* pCol = &pr->pSchema->columns[i];
|
||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
|
||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
|
@ -238,6 +237,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
taosArrayPush(pLastCols, &p);
|
||||
}
|
||||
|
||||
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
|
||||
pr->pDataFReader = NULL;
|
||||
pr->pDataFReaderLast = NULL;
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
|
@ -306,7 +309,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i);
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i);
|
||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -331,7 +334,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
code = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
_end:
|
||||
_end:
|
||||
tsdbDataFReaderClose(&pr->pDataFReaderLast);
|
||||
tsdbDataFReaderClose(&pr->pDataFReader);
|
||||
|
||||
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
|
||||
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
taosMemoryFree(pRes[j]);
|
||||
}
|
||||
|
|
|
@ -1506,7 +1506,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
#else
|
||||
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow) {
|
||||
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
|
||||
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
|
||||
pCommitter->cmprAlg);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -152,8 +152,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
|||
pInfo->loadBlocks += 1;
|
||||
|
||||
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
|
||||
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
|
||||
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
|
||||
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
|
||||
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow, pBlock, el,
|
||||
idStr);
|
||||
|
||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||
|
|
|
@ -2139,12 +2139,13 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
|||
}
|
||||
|
||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||
if (pBlockData->nRow > 0) {
|
||||
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||
|
||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||
if (pBlockData->nRow > 0) {
|
||||
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||
}
|
||||
|
||||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||
}
|
||||
|
||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||
|
@ -2619,6 +2620,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
ASSERT(tsLast >= pBlock->maxKey.ts);
|
||||
tBlockDataReset(&pReader->status.fileBlockData);
|
||||
|
||||
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
||||
code = buildComposedDataBlock(pReader);
|
||||
} else { // whole block is required, return it directly
|
||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
||||
|
|
|
@ -46,7 +46,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
code = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
code =
|
||||
extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
|
||||
|
||||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
|
||||
|
@ -62,17 +63,19 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
|
||||
// partition by tbname
|
||||
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
|
||||
pInfo->retrieveType =
|
||||
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader);
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
|
||||
} else { // by tags
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
|
||||
} else { // by tags
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE |
|
||||
(pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
||||
}
|
||||
|
||||
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
||||
|
@ -188,7 +191,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
|
||||
|
||||
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader);
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
|
||||
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||
|
@ -257,8 +260,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
|
||||
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
|
||||
if (pColMatch->colId == pWrapper->pSchema[j].colId &&
|
||||
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
(*pSlotIds)[pColMatch->dstSlotId] = -1;
|
||||
break;
|
||||
}
|
||||
|
@ -296,4 +298,4 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
|||
taosArrayDestroy(pColMatchInfo->pList);
|
||||
pColMatchInfo->pList = pMatchInfo;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -564,23 +564,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qAsyncKillTask(qinfo);
|
||||
|
||||
// Wait for the query executing thread being stopped/
|
||||
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
||||
while (pTaskInfo->owner != 0) {
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
|
|
|
@ -3639,9 +3639,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
|
|
|
@ -49,8 +49,7 @@ extern "C" {
|
|||
#define FUNC_MGT_MULTI_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20)
|
||||
#define FUNC_MGT_KEEP_ORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
|
||||
#define FUNC_MGT_CUMULATIVE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(22)
|
||||
#define FUNC_MGT_FORBID_STABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(23)
|
||||
#define FUNC_MGT_INTERP_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24)
|
||||
#define FUNC_MGT_INTERP_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(23)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
|
|
|
@ -2381,7 +2381,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "interp",
|
||||
.type = FUNCTION_TYPE_INTERP,
|
||||
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_STABLE_FUNC,
|
||||
FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.translateFunc = translateInterp,
|
||||
.getEnvFunc = getSelectivityFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -216,8 +216,6 @@ bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, F
|
|||
|
||||
bool fmIsCumulativeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CUMULATIVE_FUNC); }
|
||||
|
||||
bool fmIsForbidSuperTableFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STABLE_FUNC); }
|
||||
|
||||
bool fmIsInterpFunc(int32_t funcId) {
|
||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return false;
|
||||
|
|
|
@ -1537,25 +1537,6 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateForbidSuperTableFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
if (!fmIsForbidSuperTableFunc(pFunc->funcId)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!isSelectStmt(pCxt->pCurrStmt)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
||||
"%s is only supported in single table query", pFunc->functionName);
|
||||
}
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
SNode* pTable = pSelect->pFromTable;
|
||||
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
|
||||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
||||
"%s is only supported in single table query", pFunc->functionName);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
|
@ -1717,9 +1698,6 @@ static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) {
|
|||
|
||||
static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
int32_t code = translateAggFunc(pCxt, pFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateForbidSuperTableFunc(pCxt, pFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateScanPseudoColumnFunc(pCxt, pFunc);
|
||||
}
|
||||
|
|
|
@ -79,6 +79,12 @@ typedef struct SSyncTimer {
|
|||
void* pData;
|
||||
} SSyncTimer;
|
||||
|
||||
typedef struct SElectTimer {
|
||||
uint64_t logicClock;
|
||||
SSyncNode* pSyncNode;
|
||||
void* pData;
|
||||
} SElectTimer;
|
||||
|
||||
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
|
||||
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
|
@ -155,7 +161,6 @@ typedef struct SSyncNode {
|
|||
tmr_h pElectTimer;
|
||||
int32_t electTimerMS;
|
||||
uint64_t electTimerLogicClock;
|
||||
uint64_t electTimerLogicClockUser;
|
||||
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
|
||||
uint64_t electTimerCounter;
|
||||
|
||||
|
|
|
@ -1310,7 +1310,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->pElectTimer = NULL;
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
|
||||
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
|
||||
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
|
||||
pSyncNode->electTimerCounter = 0;
|
||||
|
||||
|
@ -1565,17 +1564,14 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
int32_t ret = 0;
|
||||
if (syncEnvIsStart()) {
|
||||
pSyncNode->electTimerMS = ms;
|
||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pElectTimer);
|
||||
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
||||
|
||||
/*
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
*/
|
||||
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
|
||||
pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
|
||||
pElectTimer->pSyncNode = pSyncNode;
|
||||
pElectTimer->pData = NULL;
|
||||
|
||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pElectTimer);
|
||||
|
||||
} else {
|
||||
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
||||
|
@ -1585,11 +1581,10 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
|
||||
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
|
||||
atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
|
||||
taosTmrStop(pSyncNode->pElectTimer);
|
||||
pSyncNode->pElectTimer = NULL;
|
||||
|
||||
// sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1815,8 +1810,6 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
|
||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClockUser);
|
||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
|
||||
cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
|
||||
|
@ -1922,7 +1915,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||
} else {
|
||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||
}
|
||||
|
@ -1946,7 +1939,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||
} else {
|
||||
snprintf(s, len, "%s", str);
|
||||
}
|
||||
|
@ -2000,7 +1993,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
|||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||
} else {
|
||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||
}
|
||||
|
@ -2022,7 +2015,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
|||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||
} else {
|
||||
snprintf(s, len, "%s", str);
|
||||
}
|
||||
|
@ -2789,38 +2782,46 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)param;
|
||||
if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
|
||||
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
|
||||
pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
|
||||
SRpcMsg rpcMsg;
|
||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
||||
}
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
SElectTimer* pElectTimer = (SElectTimer*)param;
|
||||
SSyncNode* pSyncNode = pElectTimer->pSyncNode;
|
||||
|
||||
// reset timer ms
|
||||
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pElectTimer);
|
||||
} else {
|
||||
sError("sync env is stop, syncNodeEqElectTimer");
|
||||
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
|
||||
pSyncNode->vgId, pSyncNode);
|
||||
SRpcMsg rpcMsg;
|
||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
taosMemoryFree(pElectTimer);
|
||||
return;
|
||||
}
|
||||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
} else {
|
||||
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64,
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
||||
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
||||
}
|
||||
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
taosMemoryFree(pElectTimer);
|
||||
|
||||
#if 0
|
||||
// reset timer ms
|
||||
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
|
||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pElectTimer);
|
||||
} else {
|
||||
sError("sync env is stop, syncNodeEqElectTimer");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||
|
@ -3000,16 +3001,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
// on message ----
|
||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
||||
// log state
|
||||
char logBuf[1024] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64
|
||||
", "
|
||||
"electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d",
|
||||
ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
|
||||
ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);
|
||||
|
||||
int32_t ret = 0;
|
||||
syncPingLog2(logBuf, pMsg);
|
||||
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
|
||||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||
|
@ -3024,7 +3015,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
|||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||
syncPingReplyDestroy(pMsgReply);
|
||||
|
||||
return ret;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
||||
|
|
|
@ -113,10 +113,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
|
|||
}
|
||||
|
||||
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
|
||||
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
|
||||
if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) {
|
||||
++(ths->electTimerCounter);
|
||||
sTrace("vgId:%d, sync timer, type:election count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId,
|
||||
ths->electTimerCounter, ths->electTimerLogicClockUser);
|
||||
|
||||
syncNodeElect(ths);
|
||||
}
|
||||
|
|
|
@ -66,7 +66,12 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
|
|||
|
||||
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
|
||||
ASSERT(pMsg->voteGranted == true);
|
||||
ASSERT(pMsg->term == pVotesGranted->term);
|
||||
|
||||
if (pMsg->term != pVotesGranted->term) {
|
||||
syncNodeEventLog(pVotesGranted->pSyncNode, "vote grant vnode error");
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
|
||||
|
||||
int j = -1;
|
||||
|
@ -201,7 +206,11 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
|
|||
}
|
||||
|
||||
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
|
||||
ASSERT(pVotesRespond->term == pMsg->term);
|
||||
if (pVotesRespond->term != pMsg->term) {
|
||||
syncNodeEventLog(pVotesRespond->pSyncNode, "vote respond add error");
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
|
||||
// ASSERT(pVotesRespond->isRespond[i] == false);
|
||||
|
|
|
@ -91,48 +91,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
|
|||
return dst;
|
||||
}
|
||||
|
||||
void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfElems) {
|
||||
assert(pArray != NULL && pData != NULL);
|
||||
if (numOfElems <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pArray);
|
||||
if (numOfElems >= size) {
|
||||
taosArrayClear(pArray);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t i = pData[0] + 1, j = 0;
|
||||
while (i < size) {
|
||||
if (j == numOfElems - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
char* p = TARRAY_GET_ELEM(pArray, i);
|
||||
if (i > pData[j] && i < pData[j + 1]) {
|
||||
char* dst = TARRAY_GET_ELEM(pArray, i - (j + 1));
|
||||
memmove(dst, p, pArray->elemSize);
|
||||
} else if (i == pData[j + 1]) {
|
||||
j += 1;
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
assert(i == pData[numOfElems - 1] + 1 && i <= size);
|
||||
|
||||
int32_t srcIndex = pData[numOfElems - 1] + 1;
|
||||
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
|
||||
if (pArray->size - srcIndex > 0) {
|
||||
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
|
||||
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
|
||||
memmove(dst, src, pArray->elemSize * (pArray->size - srcIndex));
|
||||
}
|
||||
|
||||
pArray->size -= numOfElems;
|
||||
}
|
||||
|
||||
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
|
||||
assert(pArray);
|
||||
|
||||
|
@ -435,17 +393,6 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
|
|||
taosSort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
|
||||
}
|
||||
|
||||
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags) {
|
||||
assert(pArray != NULL);
|
||||
assert(key != NULL);
|
||||
|
||||
void* p = taosbsearch(&key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return *(char**)p;
|
||||
}
|
||||
|
||||
static int32_t taosArrayPartition(SArray* pArray, int32_t i, int32_t j, __ext_compar_fn_t fn, const void* userData) {
|
||||
void* key = taosArrayGetP(pArray, i);
|
||||
while (i < j) {
|
||||
|
@ -543,26 +490,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
|
|||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
||||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||
}
|
||||
// TODO(yihaoDeng) add order array<type>
|
||||
//
|
||||
|
||||
char* taosShowStrArray(const SArray* pArray) {
|
||||
int32_t sz = pArray->size;
|
||||
int32_t tlen = 0;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
tlen += strlen(taosArrayGetP(pArray, i)) + 1;
|
||||
}
|
||||
char* res = taosMemoryCalloc(1, tlen);
|
||||
char* buf = res;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char* str = taosArrayGetP(pArray, i);
|
||||
int32_t len = strlen(str);
|
||||
memcpy(buf, str, len);
|
||||
buf += len;
|
||||
if (i != sz - 1) *buf = ',';
|
||||
}
|
||||
return res;
|
||||
}
|
||||
void taosArraySwap(SArray* a, SArray* b) {
|
||||
if (a == NULL || b == NULL) return;
|
||||
size_t t = a->size;
|
||||
|
|
|
@ -11,11 +11,15 @@ class TDTestCase:
|
|||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor())
|
||||
tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def run(self):
|
||||
dbname = "db"
|
||||
tbname = "tb"
|
||||
stbname = "stb"
|
||||
ctbname1 = "ctb1"
|
||||
ctbname2 = "ctb2"
|
||||
|
||||
tdSql.prepare()
|
||||
|
||||
|
@ -621,6 +625,32 @@ class TDTestCase:
|
|||
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
|
||||
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-11 00:00:05', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
|
||||
|
||||
tdSql.execute(
|
||||
f'''create stable if not exists {dbname}.{stbname}
|
||||
(ts timestamp, c0 tinyint, c1 smallint, c2 int, c3 bigint, c4 double, c5 float, c6 bool, c7 varchar(10), c8 nchar(10)) tags(t1 int)
|
||||
'''
|
||||
)
|
||||
|
||||
|
||||
tdSql.execute(
|
||||
f'''create table if not exists {dbname}.{ctbname1} using {dbname}.{stbname} tags(1)
|
||||
'''
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(1)
|
||||
'''
|
||||
)
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')")
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')")
|
||||
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
|
||||
|
||||
|
||||
tdSql.execute(f"flush database {dbname}");
|
||||
|
||||
# test fill null
|
||||
|
@ -877,6 +907,21 @@ class TDTestCase:
|
|||
tdSql.error(f"select interp('abcd') from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)")
|
||||
tdSql.error(f"select interp('中文字符') from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)")
|
||||
|
||||
tdLog.printNoPrefix("==========step12:stable cases")
|
||||
|
||||
#tdSql.query(f"select interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)")
|
||||
#tdSql.checkRows(13)
|
||||
|
||||
#tdSql.query(f"select interp(c0) from {dbname}.{ctbname1} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)")
|
||||
#tdSql.checkRows(13)
|
||||
|
||||
#tdSql.query(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)")
|
||||
#tdSql.checkRows(13)
|
||||
|
||||
#tdSql.query(f"select _irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
|
||||
#tdSql.query(f"select tbname,_irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
Loading…
Reference in New Issue