Merge branch '3.0' into feature/TD-11274-3.0

This commit is contained in:
Cary Xu 2022-07-16 17:12:56 +08:00
commit 01604d6d1d
35 changed files with 1459 additions and 769 deletions

View File

@ -111,6 +111,9 @@ int32_t derivativeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalar
int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -332,7 +332,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) #define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) #define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) #define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)

View File

@ -142,6 +142,7 @@ void taos_close(TAOS *taos) {
int taos_errno(TAOS_RES *res) { int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return terrno; return terrno;
} }
@ -149,11 +150,13 @@ int taos_errno(TAOS_RES *res) {
return 0; return 0;
} }
return ((SRequestObj *)res)->code; return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_RPC_NETWORK_UNAVAIL
: ((SRequestObj *)res)->code;
} }
const char *taos_errstr(TAOS_RES *res) { const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return (const char *)tstrerror(terrno); return (const char *)tstrerror(terrno);
} }
@ -165,7 +168,8 @@ const char *taos_errstr(TAOS_RES *res) {
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) { if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
return pRequest->msgBuf; return pRequest->msgBuf;
} else { } else {
return (const char *)tstrerror(pRequest->code); return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_RPC_NETWORK_UNAVAIL)
: (const char *)tstrerror(pRequest->code);
} }
} }

View File

@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);

View File

@ -24,12 +24,12 @@ extern "C" {
// tsdbDebug ================ // tsdbDebug ================
// clang-format off // clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct TSDBROW TSDBROW; typedef struct TSDBROW TSDBROW;
@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2); int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock); bool tBlockHasSma(SBlock *pBlock);
// SBlockIdx // SBlockIdx
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs); int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
@ -126,6 +125,8 @@ void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tPutColData(uint8_t *p, SColData *pColData);
int32_t tGetColData(uint8_t *p, SColData *pColData);
// SBlockData // SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
@ -134,14 +135,17 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tBlockDataInit(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData); void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph);
@ -202,7 +206,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid); void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState); SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
// SDataFWriter // SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
@ -357,10 +361,6 @@ struct TSDBROW {
struct SBlockIdx { struct SBlockIdx {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset; int64_t offset;
int64_t size; int64_t size;
}; };

View File

@ -312,6 +312,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
struct SSnapDataHdr { struct SSnapDataHdr {
int8_t type; int8_t type;
int64_t index;
int64_t size; int64_t size;
uint8_t data[]; uint8_t data[];
}; };

View File

@ -26,60 +26,72 @@ struct SMetaSnapReader {
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
int32_t c = 0; int32_t c = 0;
SMetaSnapReader* pMetaSnapReader = NULL; SMetaSnapReader* pReader = NULL;
// alloc // alloc
pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader)); pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pMetaSnapReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pMetaSnapReader->pMeta = pMeta; pReader->pMeta = pMeta;
pMetaSnapReader->sver = sver; pReader->sver = sver;
pMetaSnapReader->ever = ever; pReader->ever = ever;
// impl // impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL); code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
*ppReader = pMetaSnapReader; metaInfo("vgId:%d vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
*ppReader = pReader;
return code; return code;
_err: _err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) { int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pTbc); tdbTbcClose((*ppReader)->pTbc);
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
*ppReader = NULL; *ppReader = NULL;
return 0;
return code;
} }
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL; const void* pKey = NULL;
const void* pData = NULL; const void* pData = NULL;
int32_t nKey = 0; int32_t nKey = 0;
int32_t nData = 0; int32_t nData = 0;
int32_t code = 0; STbDbKey key;
*ppData = NULL;
for (;;) { for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
code = TSDB_CODE_VND_READ_END;
goto _exit; goto _exit;
} }
if (((STbDbKey*)pData)->version < pReader->sver) { key = ((STbDbKey*)pKey)[0];
if (key.version > pReader->ever) {
goto _exit;
}
if (key.version < pReader->sver) {
tdbTbcMoveToNext(pReader->pTbc); tdbTbcMoveToNext(pReader->pTbc);
continue; continue;
} }
@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
break; break;
} }
// copy the data ASSERT(pData && nData);
if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; goto _err;
} }
((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
((SSnapDataHdr*)(*ppData))->size = nData; SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData); pHdr->type = 0; // TODO: use macro
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit: _exit:
return code; return code;
_err:
metaError("vgId:%d vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
return code;
} }
// SMetaSnapWriter ======================================== // SMetaSnapWriter ========================================
@ -108,18 +131,6 @@ struct SMetaSnapWriter {
int64_t ever; int64_t ever;
}; };
static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) { int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
SMetaSnapWriter* pWriter; SMetaSnapWriter* pWriter;
@ -148,10 +159,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
SMetaSnapWriter* pWriter = *ppWriter; SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) { if (rollback) {
code = metaSnapRollback(pWriter); ASSERT(0);
if (code) goto _err;
} else { } else {
code = metaSnapCommit(pWriter); code = metaCommit(pWriter->pMeta);
if (code) goto _err; if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
@ -170,15 +180,16 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
SMetaEntry metaEntry = {0}; SMetaEntry metaEntry = {0};
SDecoder* pDecoder = &(SDecoder){0}; SDecoder* pDecoder = &(SDecoder){0};
tDecoderInit(pDecoder, pData, nData); tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
metaDecodeEntry(pDecoder, &metaEntry); metaDecodeEntry(pDecoder, &metaEntry);
code = metaHandleEntry(pMeta, &metaEntry); code = metaHandleEntry(pMeta, &metaEntry);
if (code) goto _err; if (code) goto _err;
tDecoderClear(pDecoder);
return code; return code;
_err: _err:
metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else { } else {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
if (state->pBlockData) { if (state->pBlockData) {
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err; if (code) goto _err;
/* if (state->pBlockIdx) { */ /* if (state->pBlockIdx) { */
/* tBlockIdxReset(state->blockIdx); */
/* } */ /* } */
/* tBlockIdxReset(state->blockIdx); */
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
* &state->blockIdx); * &state->blockIdx);
*/ */
@ -582,8 +580,8 @@ _err:
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
@ -609,8 +607,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }

View File

@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->aBlockIdx); taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap); tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData); tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
if (pRSet) { if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err; if (code) goto _err;
@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
.fLast = {.commitID = pCommitter->commitID, .size = 0}, .fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = pRSet->fSma}; .fSma = pRSet->fSma};
} else { } else {
STfs *pTfs = pTsdb->pVnode->pTfs; wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
SDiskID did = {.level = 0, .id = 0};
// TODO: alloc a new disk
// tfsAllocDisk(pTfs, 0, &did);
// create the directory
tfsMkdirRecurAt(pTfs, pTsdb->path, did);
wSet = (SDFileSet){.diskId = did,
.fid = pCommitter->commitFid, .fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pCommitter->commitID, .size = 0}, .fData = {.commitID = pCommitter->commitID, .size = 0},
@ -1001,10 +992,10 @@ _exit:
static void tsdbCommitDataEnd(SCommitter *pCommitter) { static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx); taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap); tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData); tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN); taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap); tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData); tBlockDataClear(&pCommitter->nBlockData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }

View File

@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) {
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; } SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag) {
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, flag);
} }

View File

@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, NULL, NULL); pBlockData, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
while (1) { while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
terrno = code; terrno = code;
return NULL; return NULL;
} }
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
hasNext = (pBlockIter->numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr); // pReader->pFileGroup->fid, pReader->idStr);
} }
return code; return code;
@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
tFree(pBuf1); tFree(pBuf1);
@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
// merge two block data // merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
ASSERT(pBlock->nRow == pBlockData->nRow); ASSERT(pBlock->nRow == pBlockData->nRow);

File diff suppressed because it is too large Load Diff

View File

@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
// alloc // alloc
code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
if (code) goto _err; if (code) goto _exit;
code = tRealloc(&pMapData->pData, pMapData->nData); code = tRealloc(&pMapData->pData, pMapData->nData);
if (code) goto _err; if (code) goto _exit;
// put // put
pMapData->aOffset[nItem] = offset; pMapData->aOffset[nItem] = offset;
tPutItemFn(pMapData->pData + offset, pItem); tPutItemFn(pMapData->pData + offset, pItem);
_err: _exit:
return code; return code;
} }
@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
} }
// SBlockIdx ====================================================== // SBlockIdx ======================================================
void tBlockIdxReset(SBlockIdx *pBlockIdx) {
pBlockIdx->minKey = TSKEY_MAX;
pBlockIdx->maxKey = TSKEY_MIN;
pBlockIdx->minVersion = VERSION_MAX;
pBlockIdx->maxVersion = VERSION_MIN;
pBlockIdx->offset = -1;
pBlockIdx->size = -1;
}
int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlockIdx *pBlockIdx = (SBlockIdx *)ph; SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
n += tPutI64(p ? p + n : p, pBlockIdx->suid); n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid); n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutI64(p ? p + n : p, pBlockIdx->minKey);
n += tPutI64(p ? p + n : p, pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset); n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size); n += tPutI64v(p ? p + n : p, pBlockIdx->size);
@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid); n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid); n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetI64(p + n, &pBlockIdx->minKey);
n += tGetI64(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetI64v(p + n, &pBlockIdx->offset); n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size); n += tGetI64v(p + n, &pBlockIdx->size);
@ -921,6 +904,76 @@ _exit:
return code; return code;
} }
int32_t tPutColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tPutI16v(p ? p + n : p, pColData->cid);
n += tPutI8(p ? p + n : p, pColData->type);
n += tPutI8(p ? p + n : p, pColData->smaOn);
n += tPutI32v(p ? p + n : p, pColData->nVal);
n += tPutU8(p ? p + n : p, pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
if (p) {
memcpy(p + n, pColData->pBitMap, size);
}
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
if (p) {
memcpy(p + n, pColData->aOffset, size);
}
n += size;
}
n += tPutI32v(p ? p + n : p, pColData->nData);
if (p) {
memcpy(p + n, pColData->pData, pColData->nData);
}
n += pColData->nData;
_exit:
return n;
}
int32_t tGetColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tGetI16v(p + n, &pColData->cid);
n += tGetI8(p + n, &pColData->type);
n += tGetI8(p + n, &pColData->smaOn);
n += tGetI32v(p + n, &pColData->nVal);
n += tGetU8(p + n, &pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
pColData->pBitMap = p + n;
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
pColData->aOffset = (int32_t *)(p + n);
n += size;
}
n += tGetI32v(p + n, &pColData->nData);
pColData->pData = p + n;
n += pColData->nData;
_exit:
return n;
}
static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
SColData *pColData1 = (SColData *)p1; SColData *pColData1 = (SColData *)p1;
SColData *pColData2 = (SColData *)p2; SColData *pColData2 = (SColData *)p2;
@ -962,11 +1015,11 @@ void tBlockDataReset(SBlockData *pBlockData) {
taosArrayClear(pBlockData->aIdx); taosArrayClear(pBlockData->aIdx);
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
} }
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
@ -1079,6 +1132,46 @@ _err:
return code; return code;
} }
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < taosArrayGetSize(pBlockData->aIdx)) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
}
iColData++;
break;
} else if (pColData->cid == pColDataFrom->cid) {
iColData++;
break;
} else {
iColData++;
}
}
}
_exit:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
@ -1239,6 +1332,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
*ppColData = NULL; *ppColData = NULL;
} }
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
n += tPutI32v(p ? p + n : p, pBlockData->nRow);
if (p) {
memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow);
}
n = n + sizeof(int64_t) * pBlockData->nRow;
if (p) {
memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow);
}
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol = taosArrayGetSize(pBlockData->aIdx);
n += tPutI32v(p ? p + n : p, nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
n += tPutColData(p ? p + n : p, pColData);
}
return n;
}
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
tBlockDataReset(pBlockData);
n += tGetI32v(p + n, &pBlockData->nRow);
pBlockData->aVersion = (int64_t *)(p + n);
n = n + sizeof(int64_t) * pBlockData->nRow;
pBlockData->aTSKEY = (TSKEY *)(p + n);
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol;
n += tGetI32v(p + n, &nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData;
if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1;
n += tGetColData(p + n, pColData);
}
return n;
}
// ALGORITHM ============================== // ALGORITHM ==============================
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal; SColVal colVal;

View File

@ -20,13 +20,13 @@ struct SVSnapReader {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
int8_t metaDone; int8_t metaDone;
SMetaSnapReader *pMetaReader; SMetaSnapReader *pMetaReader;
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
uint8_t *pData;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader); vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
if (code) goto _err;
code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
if (code) goto _err;
*ppReader = pReader; *ppReader = pReader;
return code; return code;
@ -60,54 +55,85 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
tFree(pReader->pData); if (pReader->pTsdbReader) {
if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader); tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader); }
taosMemoryFree(pReader);
if (pReader->pMetaReader) {
metaSnapReaderClose(&pReader->pMetaReader);
}
vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode));
taosMemoryFree(pReader);
return code; return code;
} }
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
// META ==============
if (!pReader->metaDone) { if (!pReader->metaDone) {
code = metaSnapRead(pReader->pMetaReader, &pReader->pData); // open reader if not
if (pReader->pMetaReader == NULL) {
code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
if (code) goto _err;
}
code = metaSnapRead(pReader->pMetaReader, ppData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->metaDone = 1; pReader->metaDone = 1;
} else { code = metaSnapReaderClose(&pReader->pMetaReader);
goto _err; if (code) goto _err;
} }
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
goto _exit;
} }
} }
// TSDB ==============
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData); // open if not
if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader);
if (code) goto _err;
}
code = tsdbSnapRead(pReader->pTsdbReader, ppData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { goto _err;
pReader->tsdbDone = 1;
} else {
goto _err;
}
} else { } else {
*ppData = pReader->pData; if (*ppData) {
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size; goto _exit;
goto _exit; } else {
pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
}
} }
} }
code = TSDB_CODE_VND_READ_END; *ppData = NULL;
*nData = 0;
_exit: _exit:
if (*ppData) {
SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
} else {
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
}
return code; return code;
_err: _err:
vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
return code; return code;
} }
@ -116,24 +142,13 @@ struct SVSnapWriter {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
}; };
static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
SVSnapWriter *pWriter = NULL; SVSnapWriter *pWriter = NULL;
@ -148,62 +163,78 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
*ppWriter = pWriter;
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppWriter = NULL;
return code; return code;
} }
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
if (rollback) { if (pWriter->pMetaSnapWriter) {
code = vnodeSnapRollback(pWriter); code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
if (code) goto _err;
} else {
code = vnodeSnapCommit(pWriter);
if (code) goto _err; if (code) goto _err;
} }
if (pWriter->pTsdbSnapWriter) {
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
if (code) goto _err;
}
_exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
return code; return code;
} }
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData); ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->index == pWriter->index + 1);
pWriter->index = pHdr->index;
if (pSnapDataHdr->type == 0) { vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData);
if (pHdr->type == 0) {
// meta // meta
if (pWriter->pMetaSnapWriter == NULL) { if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err; if (code) goto _err;
} }
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} else { } else {
// tsdb // tsdb
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err; if (code) goto _err;
} }
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
tstrerror(code), pHdr->index, pHdr->type, nData);
return code; return code;
} }

View File

@ -521,8 +521,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
// TODO longjmp(pTaskInfo->env, code);
return NULL;
} }
} }

View File

@ -2369,6 +2369,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = hllFunction, .processFunc = hllFunction,
.sprocessFunc = hllScalarFunction,
.finalizeFunc = hllFinalize, .finalizeFunc = hllFinalize,
.invertFunc = NULL, .invertFunc = NULL,
.combineFunc = hllCombine, .combineFunc = hllCombine,
@ -2407,6 +2408,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = diffFunctionSetup,
.processFunc = diffFunction, .processFunc = diffFunction,
.sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
@ -2437,6 +2439,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getCsumFuncEnv, .getEnvFunc = getCsumFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = csumFunction, .processFunc = csumFunction,
.sprocessFunc = csumScalarFunction,
.finalizeFunc = NULL .finalizeFunc = NULL
}, },
{ {
@ -2463,7 +2466,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "tail", .name = "tail",
.type = FUNCTION_TYPE_TAIL, .type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_IMPLICIT_TS_FUNC, FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateTail, .translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv, .getEnvFunc = getTailFuncEnv,
@ -2474,7 +2477,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "unique", .name = "unique",
.type = FUNCTION_TYPE_UNIQUE, .type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateUnique, .translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv, .getEnvFunc = getUniqueFuncEnv,

View File

@ -2723,6 +2723,9 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function.
// we will use this opt implementation in an new version that is only available in scan subplan
#if 0
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly // filter according to current result firstly
if (pResInfo->numOfRes > 0) { if (pResInfo->numOfRes > 0) {
@ -2770,6 +2773,22 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
} }
} }
} }
#else
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
pResInfo->numOfRes = 1;
}
}
#endif
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2801,6 +2820,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function.
#if 0
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
@ -2833,6 +2854,22 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
break; break;
} }
} }
#else
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveCurrentVal(pCtx, i, cts, type, data);
pResInfo->numOfRes = 1;
}
}
#endif
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2988,6 +3025,9 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
#if 0
// the optimized version only function if all tuples in one block are monotonious increasing or descreasing.
// this is NOT always works if project operator exists in downstream.
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
@ -2997,6 +3037,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo); doSaveLastrow(pCtx, data, i, cts, pInfo);
} }
break; break;
} }
} else { // descending order } else { // descending order
@ -3011,7 +3052,19 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
break; break;
} }
} }
#else
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
pResInfo->numOfRes = 1;
}
}
#endif
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -5926,6 +5979,7 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo); doSaveLastrow(pCtx, data, i, cts, pInfo);
pResInfo->numOfRes = 1;
} }
} }

View File

@ -1746,20 +1746,14 @@ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
SColumnInfoData *pOutputData = pOutput->columnData; SColumnInfoData *pOutputData = pOutput->columnData;
int64_t *out = (int64_t *)pOutputData->pData; int64_t *out = (int64_t *)pOutputData->pData;
bool hasNull = false;
*out = 0; *out = 0;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) { if (colDataIsNull_s(pInputData, i)) {
hasNull = true; continue;
break;
} }
(*out)++; (*out)++;
} }
if (hasNull) {
colDataAppendNULL(pOutputData, 0);
}
pOutput->numOfRows = 1; pOutput->numOfRows = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2414,6 +2408,10 @@ int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return nonCalcScalarFunction(pInput, inputNum, pOutput); return nonCalcScalarFunction(pInput, inputNum, pOutput);
} }
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return nonCalcScalarFunction(pInput, inputNum, pOutput);
}
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return avgScalarFunction(pInput, inputNum, pOutput); return avgScalarFunction(pInput, inputNum, pOutput);
} }
@ -2421,3 +2419,11 @@ int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return avgScalarFunction(pInput, inputNum, pOutput); return avgScalarFunction(pInput, inputNum, pOutput);
} }
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return countScalarFunction(pInput, inputNum, pOutput);
}
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return sumScalarFunction(pInput, inputNum, pOutput);
}

View File

@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot -------------- // snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);

View File

@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
// maybe update commit index by snapshot
syncNodeMaybeUpdateCommitBySnapshot(ths);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = ths->commitIndex;
// msg event log // msg event log
do { do {

View File

@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// print log
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
pMsg->matchIndex, pMsg->success);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
@ -238,7 +247,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSnapshot oldSnapshot; SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm; SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg);
SyncIndex endIndex;
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex + 1)) {
endIndex = nextIndex;
} else {
endIndex = oldSnapshot.lastApplyIndex;
}
syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, endIndex, newSnapshotTerm, pMsg);
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
@ -256,6 +272,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} }
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
}
// event log, update next-index // event log, update next-index
do { do {
char host[64]; char host[64];

View File

@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
return pSyncNode; return pSyncNode;
} }
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot;
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
pSyncNode->commitIndex = snapshot.lastApplyIndex;
}
}
}
void syncNodeStart(SSyncNode* pSyncNode) { void syncNodeStart(SSyncNode* pSyncNode) {
// start raft // start raft
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {

View File

@ -336,8 +336,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")

View File

@ -79,22 +79,39 @@ class TDSql:
self.queryResult = None self.queryResult = None
tdLog.info("sql:%s, expect error occured" % (sql)) tdLog.info("sql:%s, expect error occured" % (sql))
def query(self, sql, row_tag=None): def query(self, sql, row_tag=None,queyTimes=10):
self.sql = sql self.sql = sql
try: i=1
self.cursor.execute(sql) while i <= queyTimes:
self.queryResult = self.cursor.fetchall() try:
self.queryRows = len(self.queryResult) self.cursor.execute(sql)
self.queryCols = len(self.cursor.description) self.queryResult = self.cursor.fetchall()
except Exception as e: self.queryRows = len(self.queryResult)
caller = inspect.getframeinfo(inspect.stack()[1][0]) self.queryCols = len(self.cursor.description)
args = (caller.filename, caller.lineno, sql, repr(e)) if row_tag:
tdLog.notice("%s(%d) failed: sql:%s, %s" % args) return self.queryResult
traceback.print_exc() return self.queryRows
raise Exception(repr(e)) except Exception as e:
if row_tag: i+=1
return self.queryResult tdLog.notice("Try to query again, query times: %d "%i)
return self.queryRows pass
else:
try:
tdLog.notice("Try the last query ")
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
if row_tag:
return self.queryResult
return self.queryRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
traceback.print_exc()
raise Exception(repr(e))
def is_err_sql(self, sql): def is_err_sql(self, sql):
err_flag = True err_flag = True
@ -266,16 +283,27 @@ class TDSql:
time.sleep(1) time.sleep(1)
continue continue
def execute(self, sql): def execute(self, sql,queyTimes=10):
self.sql = sql self.sql = sql
try: i=1
self.affectedRows = self.cursor.execute(sql) while i <= queyTimes:
except Exception as e: try:
caller = inspect.getframeinfo(inspect.stack()[1][0]) self.affectedRows = self.cursor.execute(sql)
args = (caller.filename, caller.lineno, sql, repr(e)) return self.affectedRows
tdLog.notice("%s(%d) failed: sql:%s, %s" % args) except Exception as e:
raise Exception(repr(e)) i+=1
return self.affectedRows tdLog.notice("Try to execute sql again, query times: %d "%i)
pass
else:
try:
tdLog.notice("Try the last execute sql ")
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
def checkAffectedRows(self, expectAffectedRows): def checkAffectedRows(self, expectAffectedRows):
if self.affectedRows != expectAffectedRows: if self.affectedRows != expectAffectedRows:

View File

@ -91,21 +91,21 @@ if $rows != $vgroups then
return -1 return -1
endi endi
if $data[0][4] == LEADER then if $data[0][4] == leader then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi endi
endi endi
elif $data[0][6] == LEADER then elif $data[0][6] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi endi
endi endi
elif $data[0][8] == LEADER then elif $data[0][8] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi endi
endi endi
@ -113,21 +113,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[1][4] == LEADER then if $data[1][4] == leader then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi endi
endi endi
elif $data[1][6] == LEADER then elif $data[1][6] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi endi
endi endi
elif $data[1][8] == LEADER then elif $data[1][8] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi endi
endi endi
@ -135,21 +135,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[2][4] == LEADER then if $data[2][4] == leader then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi endi
endi endi
elif $data[2][6] == LEADER then elif $data[2][6] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi endi
endi endi
elif $data[2][8] == LEADER then elif $data[2][8] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi endi
endi endi
@ -157,21 +157,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[3][4] == LEADER then if $data[3][4] == leader then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi endi
endi endi
elif $data[3][6] == LEADER then elif $data[3][6] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi endi
endi endi
elif $data[3][8] == LEADER then elif $data[3][8] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi endi
endi endi
@ -179,21 +179,21 @@ else
goto check_vg_ready goto check_vg_ready
endi endi
if $data[4][4] == LEADER then if $data[4][4] == leader then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi endi
endi endi
elif $data[4][6] == LEADER then elif $data[4][6] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi endi
endi endi
elif $data[4][8] == LEADER then elif $data[4][8] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi endi
endi endi
@ -286,13 +286,13 @@ if $data[0][0] != 1 then
return -1 return -1
endi endi
if $data[0][2] != LEADER then if $data[0][2] != leader then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_2 goto check_mnode_ready_2
endi endi
@ -318,21 +318,21 @@ if $rows != $vgroups then
return -1 return -1
endi endi
if $data[0][4] == LEADER then if $data[0][4] == leader then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi endi
endi endi
elif $data[0][6] == LEADER then elif $data[0][6] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][8] == FOLLOWER then if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi endi
endi endi
elif $data[0][8] == LEADER then elif $data[0][8] == leader then
if $data[0][4] == FOLLOWER then if $data[0][4] == follower then
if $data[0][6] == FOLLOWER then if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi endi
endi endi
@ -340,21 +340,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[1][4] == LEADER then if $data[1][4] == leader then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] print ---- vgroup $data[1][0] leader locate on dnode $data[1][3]
endi endi
endi endi
elif $data[1][6] == LEADER then elif $data[1][6] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][8] == FOLLOWER then if $data[1][8] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] print ---- vgroup $data[1][0] leader locate on dnode $data[1][5]
endi endi
endi endi
elif $data[1][8] == LEADER then elif $data[1][8] == leader then
if $data[1][4] == FOLLOWER then if $data[1][4] == follower then
if $data[1][6] == FOLLOWER then if $data[1][6] == follower then
print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] print ---- vgroup $data[1][0] leader locate on dnode $data[1][7]
endi endi
endi endi
@ -362,21 +362,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[2][4] == LEADER then if $data[2][4] == leader then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] print ---- vgroup $data[2][0] leader locate on dnode $data[2][3]
endi endi
endi endi
elif $data[2][6] == LEADER then elif $data[2][6] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][8] == FOLLOWER then if $data[2][8] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] print ---- vgroup $data[2][0] leader locate on dnode $data[2][5]
endi endi
endi endi
elif $data[2][8] == LEADER then elif $data[2][8] == leader then
if $data[2][4] == FOLLOWER then if $data[2][4] == follower then
if $data[2][6] == FOLLOWER then if $data[2][6] == follower then
print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] print ---- vgroup $data[2][0] leader locate on dnode $data[2][7]
endi endi
endi endi
@ -384,21 +384,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[3][4] == LEADER then if $data[3][4] == leader then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] print ---- vgroup $data[3][0] leader locate on dnode $data[3][3]
endi endi
endi endi
elif $data[3][6] == LEADER then elif $data[3][6] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][8] == FOLLOWER then if $data[3][8] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] print ---- vgroup $data[3][0] leader locate on dnode $data[3][5]
endi endi
endi endi
elif $data[3][8] == LEADER then elif $data[3][8] == leader then
if $data[3][4] == FOLLOWER then if $data[3][4] == follower then
if $data[3][6] == FOLLOWER then if $data[3][6] == follower then
print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] print ---- vgroup $data[3][0] leader locate on dnode $data[3][7]
endi endi
endi endi
@ -406,21 +406,21 @@ else
goto check_vg_ready1 goto check_vg_ready1
endi endi
if $data[4][4] == LEADER then if $data[4][4] == leader then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] print ---- vgroup $data[4][0] leader locate on dnode $data[4][3]
endi endi
endi endi
elif $data[4][6] == LEADER then elif $data[4][6] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][8] == FOLLOWER then if $data[4][8] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] print ---- vgroup $data[4][0] leader locate on dnode $data[4][5]
endi endi
endi endi
elif $data[4][8] == LEADER then elif $data[4][8] == leader then
if $data[4][4] == FOLLOWER then if $data[4][4] == follower then
if $data[4][6] == FOLLOWER then if $data[4][6] == follower then
print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] print ---- vgroup $data[4][0] leader locate on dnode $data[4][7]
endi endi
endi endi
@ -539,27 +539,27 @@ if $data[0][0] != 1 then
return -1 return -1
endi endi
if $data[0][2] == LEADER then if $data[0][2] == leader then
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi
if $data[1][2] == LEADER then if $data[1][2] == leader then
if $data[0][2] != FOLLOWER then if $data[0][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[2][2] != FOLLOWER then if $data[2][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi
if $data[2][2] == LEADER then if $data[2][2] == leader then
if $data[1][2] != FOLLOWER then if $data[1][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
if $data[0][2] != FOLLOWER then if $data[0][2] != follower then
goto check_mnode_ready_3 goto check_mnode_ready_3
endi endi
endi endi

View File

@ -170,84 +170,3 @@ if $rows != 100 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
#system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 7000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
#system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
#system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################

View File

@ -1,13 +1,10 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2 system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v system sh/exec.sh -n dnode1 -s start -v
system sh/exec.sh -n dnode2 -s start -v
sql connect sql connect
print =============== add dnode2 into cluster print =============== step1: create drop show dnodes
sql create dnode $hostname port 7200
$x = 0 $x = 0
step1: step1:
$x = $x + 1 $x = $x + 1
@ -18,135 +15,59 @@ step1:
endi endi
sql show dnodes sql show dnodes
print ---> $data00 $data01 $data02 $data03 $data04 $data05 print ---> $data00 $data01 $data02 $data03 $data04 $data05
print ---> $data10 $data11 $data12 $data13 $data14 $data15 if $rows != 1 then
if $rows != 2 then
return -1 return -1
endi endi
if $data(1)[4] != ready then if $data(1)[4] != ready then
goto step1 goto step1
endi endi
if $data(2)[4] != ready then
goto step1
endi
print =============== create database, stable, table
sql create database db vgroups 3
sql use db
sql create table stb (ts timestamp, c int) tags (t int)
sql create table t0 using stb tags (0)
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
print =============== run show xxxx
sql show dnodes
if $rows != 2 then
return -1
endi
sql show mnodes
if $rows != 1 then
return -1
endi
print =============== step2: create db
sql create database d1 vgroups 2 buffer 3
sql show databases sql show databases
if $rows != 3 then sql use d1
return -1 sql show vgroups
endi
print =============== step3: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
print =============== step4: create show table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables sql show tables
if $rows != 2 then
return -1
endi
sql show users
if $rows != 1 then
return -1
endi
sql show vgroups
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
print =============== run select * from information_schema.xxxx print =============== step5: insert data
sql select * from information_schema.`dnodes` sql insert into ct1 values(now+0d, 10, 2.0, 3.0)
if $rows != 2 then sql insert into ct1 values(now+1d, 11, 2.1, 3.1)(now+2d, -12, -2.2, -3.2)(now+3d, -13, -2.3, -3.3)
return -1 sql insert into ct2 values(now+0d, 10, 2.0, 3.0)
endi sql insert into ct2 values(now+1d, 11, 2.1, 3.1)(now+2d, -12, -2.2, -3.2)(now+3d, -13, -2.3, -3.3)
sql insert into ct3 values('2022-01-01 00:00:00.000', 10, 2.0, 3.0)
sql select * from information_schema.`mnodes` print =============== step6: query data
if $rows != 1 then sql select * from ct1 where ts < now -1d and ts > now +1d
return -1 sql select * from stb where ts < now -1d and ts > now +1d
endi sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
sql select * from information_schema.user_databases _OVER:
if $rows != 3 then
return -1
endi
sql select * from information_schema.user_stables
if $rows != 1 then
return -1
endi
sql select * from information_schema.user_tables
if $rows != 31 then
return -1
endi
sql select * from information_schema.user_users
if $rows != 1 then
return -1
endi
sql select * from information_schema.`vgroups`
if $rows != 3 then
return -1
endi
sql show variables;
if $rows != 4 then
return -1
endi
sql show dnode 1 variables;
if $rows <= 0 then
return -1
endi
sql show local variables;
if $rows <= 0 then
return -1
endi
print ==== stop dnode1 and dnode2, and restart dnodes
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT print =============== check
$null=
print =============== check dnode1
system_content sh/checkValgrind.sh -n dnode1 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ] print cmd return result ----> [ $system_content ]
if $system_content <= 0 then if $system_content > 1 then
return 0 return -1
endi endi
$null=
if $system_content == $null then if $system_content == $null then
return 0 return -1
endi
print =============== check dnode2
system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null=
if $system_content == $null then
return 0
endi endi

View File

@ -23,7 +23,7 @@ if $data(1)[4] != ready then
endi endi
print =============== step2: create db print =============== step2: create db
sql create database d1 vgroups 1 buffer 3 sql create database d1 vgroups 2 buffer 3
sql show databases sql show databases
sql use d1 sql use d1
sql show vgroups sql show vgroups
@ -44,24 +44,32 @@ if $rows != 3 then
return -1 return -1
endi endi
print =============== step5: insert data print =============== step5: insert data (null / update)
sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct1 values(now+1s, 11, 2.1, NULL)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct2 values(now+0s, 10, 2.0, 3.0) sql insert into ct2 values(now+0s, 10, 2.0, 3.0)
sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) sql insert into ct3 values('2021-01-01 00:00:00.000', NULL, NULL, 3.0)
sql insert into ct3 values('2022-03-02 16:59:00.010', 3 , 4, 5), ('2022-03-02 16:59:00.010', 33 , 4, 5), ('2022-04-01 16:59:00.011', 4, 4, 5), ('2022-04-01 16:59:00.011', 6, 4, 5), ('2022-03-06 16:59:00.013', 8, 4, 5);
sql insert into ct3 values('2022-03-02 16:59:00.010', 103, 1, 2), ('2022-03-02 16:59:00.010', 303, 3, 4), ('2022-04-01 16:59:00.011', 40, 5, 6), ('2022-04-01 16:59:00.011', 60, 4, 5), ('2022-03-06 16:59:00.013', 80, 4, 5);
print =============== step6: query data print =============== step6: query data
sql select * from ct1 sql select * from ct1
sql select * from stb sql select * from stb
sql select c1, c2, c3 from ct1 sql select c1, c2, c3 from ct1
sql select ts, c1, c2, c3 from stb sql select ts, c1, c2, c3 from stb
sql select * from ct1 where ts < now -1d and ts > now +1d
sql select * from stb where ts < now -1d and ts > now +1d
#sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
#sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
print =============== step7: count print =============== step7: count
sql select count(*) from ct1; sql select count(*) from ct1;
sql select count(*) from stb; sql select count(*) from stb;
sql select count(ts), count(c1), count(c2), count(c3) from ct1 sql select count(ts), count(c1), count(c2), count(c3) from ct1
sql select count(ts), count(c1), count(c2), count(c3) from stb sql select count(ts), count(c1), count(c2), count(c3) from stb
sql select count(*) from ct1 where ts < now -1d and ts > now +1d
sql select count(*) from stb where ts < now -1d and ts > now +1d
print =============== step8: func print =============== step8: func
sql select first(ts), first(c1), first(c2), first(c3) from ct1 sql select first(ts), first(c1), first(c2), first(c3) from ct1
@ -69,6 +77,20 @@ sql select min(c1), min(c2), min(c3) from ct1
sql select max(c1), max(c2), max(c3) from ct1 sql select max(c1), max(c2), max(c3) from ct1
sql select sum(c1), sum(c2), sum(c3) from ct1 sql select sum(c1), sum(c2), sum(c3) from ct1
print =============== step9: insert select
#sql create table ct4 using stb tags(4000);
#sql insert into ct4 select * from ct1;
#sql select * from ct4;
#sql insert into ct4 select ts,c1,c2,c3 from stb;
#sql create table tb1 (ts timestamp, c1 int, c2 float, c3 double);
#sql insert into tb1 (ts, c1, c2, c3) select * from ct1;
#sql select * from tb1;
#sql create table tb2 (ts timestamp, f1 binary(10), c1 int, c2 double);
#sql insert into tb2 (c2, c1, ts) select c2+1, c1, ts+3 from ct2;
#sql select * from tb2;
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check

View File

@ -0,0 +1,63 @@
import taos
import sys
import datetime
import inspect
from util.log import *
from util.sql import *
from util.cases import *
import random
class TDTestCase:
updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143,
"jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143,
"wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "udfDebugFlag": 143}
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def case1(self):
tdSql.execute("create database if not exists dbms precision 'ms'")
tdSql.execute("create database if not exists dbus precision 'us'")
tdSql.execute("create database if not exists dbns precision 'ns'")
tdSql.execute("create table dbms.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("create table dbus.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("create table dbns.ntb (ts timestamp, c1 int, c2 bigint)")
tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.001', 1, 2)")
tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.002', 3, 4)")
tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000001', 1, 2)")
tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000002', 3, 4)")
tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000001', 1, 2)")
tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000002', 3, 4)")
tdSql.query("select count(c1) from dbms.ntb interval(1a)")
tdSql.checkRows(2)
tdSql.query("select count(c1) from dbus.ntb interval(1u)")
tdSql.checkRows(2)
tdSql.query("select count(c1) from dbns.ntb interval(1b)")
tdSql.checkRows(2)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("==========start case1 run ...............")
self.case1()
tdLog.printNoPrefix("==========end case1 run ...............")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -392,6 +392,31 @@ class TDTestCase:
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
assert unionallQnode==tdSql.queryResult assert unionallQnode==tdSql.queryResult
queryPolicy=1
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
tdSql.query("show local variables;")
for i in range(tdSql.queryRows):
if tdSql.queryResult[i][0] == "queryPolicy" :
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
tdSql.execute("reset query cache")
tdSql.execute("use db1;")
tdSql.query("show dnodes;")
dnodeId=tdSql.getData(0,0)
tdSql.query("select max(c1) from stb10;")
assert maxQnode==tdSql.getData(0,0)
tdSql.query("select min(c1) from stb11;")
assert minQnode==tdSql.getData(0,0)
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
assert unionQnode==tdSql.queryResult
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
assert unionallQnode==tdSql.queryResult
# test case : queryPolicy = 2 # test case : queryPolicy = 2
def test_case2(self): def test_case2(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10) self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10)

View File

@ -40,9 +40,9 @@ class TDTestCase:
f"insert into rct{j} values ( {ts+i*10000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" f"insert into rct{j} values ( {ts+i*10000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
) )
tdSql.execute( tdSql.execute(
f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {1+i},{1500+i*20}, {150+i*2},{5+i} )" f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {i},{1500+i*20}, {150+i*2},{5+i} )"
) )
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
# def check_avg(self ,origin_query , check_query): # def check_avg(self ,origin_query , check_query):
# avg_result = tdSql.getResult(origin_query) # avg_result = tdSql.getResult(origin_query)
# origin_result = tdSql.getResult(check_query) # origin_result = tdSql.getResult(check_query)
@ -60,13 +60,15 @@ class TDTestCase:
def tsbsIotQuery(self): def tsbsIotQuery(self):
tdSql.execute("use db_tsbs") tdSql.execute("use db_tsbs")
# test interval and partition # test interval and partition
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ") tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
print(tdSql.queryResult)
parRows=tdSql.queryRows parRows=tdSql.queryRows
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ") tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
# tdSql.checkRows(parRows) tdSql.checkRows(parRows)
# test insert into # test insert into
@ -77,18 +79,53 @@ class TDTestCase:
# test paitition interval fill # test paitition interval fill
# tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;") tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;")
# # test partition interval limit # test partition interval limit (PRcore-TD-17410)
# tdSql.query("SELECT ts,model,floor(2*(sum(nzs)/count(nzs)))/floor(2*(sum(nzs)/count(nzs))) AS broken_down FROM (SELECT ts,model, status/status AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model,ts interval(10m) limit 10;") # tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
# tdSql.checkRows(10) # tdSql.checkRows(10)
# test partition interval Pseudo time-column # test partition interval Pseudo time-column
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
# 1 high-load:
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
# tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
# 2 stationary-trucks
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)")
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name")
# 3 long-driving-sessions
# tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
#4 long-daily-sessions
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60")
# 5. avg-daily-driving-duration
tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;")
# 6. avg-daily-driving-session
#taosc core dumped
tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))")
tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;")
tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)")
# 7. avg-load
tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;")
# 8. daily-activity
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
#it's already supported:
# last-loc
tdSql.query("")
# test
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdLog.printNoPrefix("==========step1:create database and table,insert data ==============") tdLog.printNoPrefix("==========step1:create database and table,insert data ==============")
self.prepareData() self.prepareData()

View File

@ -29,6 +29,7 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 2-query/db.py
python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py python3 ./test.py -f 2-query/varchar.py

@ -1 +1 @@
Subproject commit d807c3ffa6f750f7765e102917d1328cadf21c13 Subproject commit bd496f76b64931c66da2f8b0f24143a98a881cde

@ -1 +1 @@
Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343 Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6