diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 5cbe380234..dfdb69ee3c 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -111,6 +111,9 @@ int32_t derivativeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalar int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7e61456d45..d0321d3724 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -332,7 +332,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) #define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) #define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) -#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 62052457fd..77b31011a3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -142,6 +142,7 @@ void taos_close(TAOS *taos) { int taos_errno(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { + if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return terrno; } @@ -149,11 +150,13 @@ int taos_errno(TAOS_RES *res) { return 0; } - return ((SRequestObj *)res)->code; + return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_RPC_NETWORK_UNAVAIL + : ((SRequestObj *)res)->code; } const char *taos_errstr(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { + if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return (const char *)tstrerror(terrno); } @@ -165,7 +168,8 @@ const char *taos_errstr(TAOS_RES *res) { if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) { return pRequest->msgBuf; } else { - return (const char *)tstrerror(pRequest->code); + return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_RPC_NETWORK_UNAVAIL) + : (const char *)tstrerror(pRequest->code); } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f0a6b6505d..6b5f795eb0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); +int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4df9f96514..3f667a7465 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -24,12 +24,12 @@ extern "C" { // tsdbDebug ================ // clang-format off -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on typedef struct TSDBROW TSDBROW; @@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); bool tBlockHasSma(SBlock *pBlock); // SBlockIdx -void tBlockIdxReset(SBlockIdx *pBlockIdx); int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tCmprBlockIdx(void const *lhs, void const *rhs); @@ -126,6 +125,8 @@ void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); +int32_t tPutColData(uint8_t *p, SColData *pColData); +int32_t tGetColData(uint8_t *p, SColData *pColData); // SBlockData #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) @@ -134,14 +135,17 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); +int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataClearData(SBlockData *pBlockData); -void tBlockDataClear(SBlockData *pBlockData); +void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); +int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); +int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -202,7 +206,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile); int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid); SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState); -SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); +SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); @@ -357,10 +361,6 @@ struct TSDBROW { struct SBlockIdx { int64_t suid; int64_t uid; - TSKEY minKey; - TSKEY maxKey; - int64_t minVersion; - int64_t maxVersion; int64_t offset; int64_t size; }; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9186b4794a..8b5487f170 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -312,6 +312,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); struct SSnapDataHdr { int8_t type; + int64_t index; int64_t size; uint8_t data[]; }; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index ac84842e85..85261d302e 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -26,60 +26,72 @@ struct SMetaSnapReader { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t code = 0; int32_t c = 0; - SMetaSnapReader* pMetaSnapReader = NULL; + SMetaSnapReader* pReader = NULL; // alloc - pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader)); - if (pMetaSnapReader == NULL) { + pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pMetaSnapReader->pMeta = pMeta; - pMetaSnapReader->sver = sver; - pMetaSnapReader->ever = ever; + pReader->pMeta = pMeta; + pReader->sver = sver; + pReader->ever = ever; // impl - code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL); + code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL); if (code) { + taosMemoryFree(pReader); goto _err; } - code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); + code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); if (code) { + taosMemoryFree(pReader); goto _err; } - *ppReader = pMetaSnapReader; + metaInfo("vgId:%d vnode snapshot meta reader opened", TD_VID(pMeta->pVnode)); + + *ppReader = pReader; return code; _err: - metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); + metaError("vgId:%d vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); *ppReader = NULL; return code; } int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) { + int32_t code = 0; + tdbTbcClose((*ppReader)->pTbc); taosMemoryFree(*ppReader); *ppReader = NULL; - return 0; + + return code; } int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; const void* pKey = NULL; const void* pData = NULL; int32_t nKey = 0; int32_t nData = 0; - int32_t code = 0; + STbDbKey key; + *ppData = NULL; for (;;) { - code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); - if (code || ((STbDbKey*)pData)->version > pReader->ever) { - code = TSDB_CODE_VND_READ_END; + if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) { goto _exit; } - if (((STbDbKey*)pData)->version < pReader->sver) { + key = ((STbDbKey*)pKey)[0]; + if (key.version > pReader->ever) { + goto _exit; + } + + if (key.version < pReader->sver) { tdbTbcMoveToNext(pReader->pTbc); continue; } @@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { break; } - // copy the data - if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) { + ASSERT(pData && nData); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData); + if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - return code; + goto _err; } - ((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro - ((SSnapDataHdr*)(*ppData))->size = nData; - memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData); + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = 0; // TODO: use macro + pHdr->size = nData; + memcpy(pHdr->data, pData, nData); + + metaInfo("vgId:%d vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d", + TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData); _exit: return code; + +_err: + metaError("vgId:%d vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code)); + return code; } // SMetaSnapWriter ======================================== @@ -108,18 +131,6 @@ struct SMetaSnapWriter { int64_t ever; }; -static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) { - int32_t code = 0; - // TODO - return code; -} - int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) { int32_t code = 0; SMetaSnapWriter* pWriter; @@ -148,10 +159,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { SMetaSnapWriter* pWriter = *ppWriter; if (rollback) { - code = metaSnapRollback(pWriter); - if (code) goto _err; + ASSERT(0); } else { - code = metaSnapCommit(pWriter); + code = metaCommit(pWriter->pMeta); if (code) goto _err; } taosMemoryFree(pWriter); @@ -170,15 +180,16 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) SMetaEntry metaEntry = {0}; SDecoder* pDecoder = &(SDecoder){0}; - tDecoderInit(pDecoder, pData, nData); + tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); metaDecodeEntry(pDecoder, &metaEntry); code = metaHandleEntry(pMeta, &metaEntry); if (code) goto _err; + tDecoderClear(pDecoder); return code; _err: - metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); + metaError("vgId:%d vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 605f4bf3d5..723caca5d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { - // tBlockDataClear(&state->blockData); + // tBlockDataClear(&state->blockData, 1); if (state->pBlockData) { - tBlockDataClear(state->pBlockData); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } @@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (code) goto _err; /* if (state->pBlockIdx) { */ - /* tBlockIdxReset(state->blockIdx); */ /* } */ - /* tBlockIdxReset(state->blockIdx); */ /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, * &state->blockIdx); */ @@ -582,8 +580,8 @@ _err: state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData); - tBlockDataClear(state->pBlockData); + // tBlockDataClear(&state->blockData, 1); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } @@ -609,8 +607,8 @@ int32_t clearNextRowFromFS(void *iter) { state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData); - tBlockDataClear(state->pBlockData); + // tBlockDataClear(&state->blockData, 1); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8a189d98b3..470bff1eae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->aBlockIdx); tMapDataReset(&pCommitter->oBlockMap); tBlockDataReset(&pCommitter->oBlockData); - pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); + pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; @@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { .fLast = {.commitID = pCommitter->commitID, .size = 0}, .fSma = pRSet->fSma}; } else { - STfs *pTfs = pTsdb->pVnode->pTfs; - SDiskID did = {.level = 0, .id = 0}; - - // TODO: alloc a new disk - // tfsAllocDisk(pTfs, 0, &did); - - // create the directory - tfsMkdirRecurAt(pTfs, pTsdb->path, did); - - wSet = (SDFileSet){.diskId = did, + wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0}, .fid = pCommitter->commitFid, .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, .fData = {.commitID = pCommitter->commitID, .size = 0}, @@ -1001,10 +992,10 @@ _exit: static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->aBlockIdx); tMapDataClear(&pCommitter->oBlockMap); - tBlockDataClear(&pCommitter->oBlockData); + tBlockDataClear(&pCommitter->oBlockData, 1); taosArrayDestroy(pCommitter->aBlockIdxN); tMapDataClear(&pCommitter->nBlockMap); - tBlockDataClear(&pCommitter->nBlockData); + tBlockDataClear(&pCommitter->nBlockData, 1); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 53b6735c30..70fbd90646 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) { SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; } -SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { - return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); +SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag) { + return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, flag); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f0aea0cefb..7b4127e591 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo { } SBlockLoadSuppInfo; typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - SArray* pFileList; // data file list - int32_t order; + int32_t numOfFiles; // number of total files + int32_t index; // current accessed index in the list + SArray* pFileList; // data file list + int32_t order; } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, - pBlockData, NULL, NULL); + int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, + pBlockData, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { SReaderStatus* pStatus = &pReader->status; - SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); + SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); while (1) { bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); @@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { - tBlockDataClear(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData, 1); terrno = code; return NULL; } copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - tBlockDataClear(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData, 1); return pReader->pResBlock->pDataBlock; } @@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa hasNext = (pBlockIter->numOfBlocks > 0); } -// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, -// pReader->pFileGroup->fid, pReader->idStr); + // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, + // pReader->pFileGroup->fid, pReader->idStr); } return code; @@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } - diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 4e7a9d3b04..0ed2e12542 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); } tFree(pBuf1); @@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } // merge two block data code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); } ASSERT(pBlock->nRow == pBlockData->nRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 06d4a86116..4886d874a9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -29,29 +29,27 @@ struct STsdbSnapReader { SBlockIdx* pBlockIdx; SMapData mBlock; // SMapData int32_t iBlock; - SBlockData blkData; + SBlockData oBlockData; + SBlockData nBlockData; // for del file int8_t delDone; SDelFReader* pDelFReader; + SArray* aDelIdx; // SArray int32_t iDelIdx; - SArray* aDelIdx; // SArray SArray* aDelData; // SArray }; static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; + STsdb* pTsdb = pReader->pTsdb; while (true) { if (pReader->pDataFReader == NULL) { - SDFileSet* pSet = NULL; + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT); - // search the next data file set to read (todo) - if (0 /* TODO */) { - code = TSDB_CODE_VND_READ_END; - goto _exit; - } + if (pSet == NULL) goto _exit; - // open + pReader->fid = pSet->fid; code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); if (code) goto _err; @@ -61,6 +59,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->iBlockIdx = 0; pReader->pBlockIdx = NULL; + + tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid); } while (true) { @@ -73,17 +73,15 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); pReader->iBlockIdx++; - // SBlock code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL); if (code) goto _err; pReader->iBlock = 0; } + SBlock block; + SBlock* pBlock = █ while (true) { - SBlock block; - SBlock* pBlock = █ - if (pReader->iBlock >= pReader->mBlock.nItem) { pReader->pBlockIdx = NULL; break; @@ -92,15 +90,60 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock); pReader->iBlock++; - if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) || - (pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) { - // overlap (todo) + if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue; - code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL); + code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL); + if (code) goto _err; + + // filter + tBlockDataReset(&pReader->nBlockData); + for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) { + SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData); + SColData* pColDataN = NULL; + + code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN); if (code) goto _err; - goto _exit; + tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn); } + + for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) { + TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow); + int64_t version = TSDBROW_VERSION(&row); + + if (version < pReader->sver || version > pReader->ever) continue; + + code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL); + if (code) goto _err; + } + + // org data + // compress data (todo) + int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = 1; + pHdr->size = size; + + TABLEID* pId = (TABLEID*)(&pHdr[1]); + pId->suid = pReader->pBlockIdx->suid; + pId->uid = pReader->pBlockIdx->uid; + + tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData); + + tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64 + " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d", + TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, + pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow, + size); + + goto _exit; } } } @@ -109,7 +152,7 @@ _exit: return code; _err: - tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -120,7 +163,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { if (pReader->pDelFReader == NULL) { if (pDelFile == NULL) { - code = TSDB_CODE_VND_READ_END; goto _exit; } @@ -135,15 +177,20 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->iDelIdx = 0; } - while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) { + while (true) { + if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) { + tsdbDelFReaderClose(&pReader->pDelFReader); + break; + } + SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx); - int32_t size = 0; pReader->iDelIdx++; code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL); if (code) goto _err; + int32_t size = 0; for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData); @@ -152,46 +199,44 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { } } - if (size > 0) { - int64_t n = 0; + if (size == 0) continue; - size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID); - code = tRealloc(ppData, size); - if (code) goto _err; - - // SSnapDataHdr - SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n); - pSnapDataHdr->type = 1; - pSnapDataHdr->size = size; // TODO: size here may incorrect - n += sizeof(SSnapDataHdr); - - // TABLEID - TABLEID* pId = (TABLEID*)(*ppData + n); - pId->suid = pDelIdx->suid; - pId->uid = pDelIdx->uid; - n += sizeof(*pId); - - // DATA - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { - SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData); - - if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) { - n += tPutDelData(*ppData + n, pDelData); - } - } - - goto _exit; + // org data + size = sizeof(TABLEID) + size; + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } - } - code = TSDB_CODE_VND_READ_END; - tsdbDelFReaderClose(&pReader->pDelFReader); + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = 2; + pHdr->size = size; + + TABLEID* pId = (TABLEID*)(&pHdr[1]); + pId->suid = pDelIdx->suid; + pId->uid = pDelIdx->uid; + int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { + SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData); + + if (pDelData->version < pReader->sver) continue; + if (pDelData->version > pReader->ever) continue; + + n += tPutDelData((*ppData) + n, pDelData); + } + + tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d", + TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size); + + break; + } _exit: return code; _err: - tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -209,15 +254,16 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe pReader->sver = sver; pReader->ever = ever; + pReader->fid = INT32_MIN; pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pReader->aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pReader->mBlock = tMapDataInit(); - - code = tBlockDataInit(&pReader->blkData); + code = tBlockDataInit(&pReader->oBlockData); + if (code) goto _err; + code = tBlockDataInit(&pReader->nBlockData); if (code) goto _err; pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); @@ -225,18 +271,18 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pReader->aDelData = taosArrayInit(0, sizeof(SDelData)); if (pReader->aDelData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode)); *ppReader = pReader; return code; _err: - tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); *ppReader = NULL; return code; } @@ -245,37 +291,43 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { int32_t code = 0; STsdbSnapReader* pReader = *ppReader; - taosArrayDestroy(pReader->aDelData); - taosArrayDestroy(pReader->aDelIdx); - if (pReader->pDelFReader) { - tsdbDelFReaderClose(&pReader->pDelFReader); - } - tBlockDataClear(&pReader->blkData); - tMapDataClear(&pReader->mBlock); - taosArrayDestroy(pReader->aBlockIdx); if (pReader->pDataFReader) { tsdbDataFReaderClose(&pReader->pDataFReader); } + taosArrayDestroy(pReader->aBlockIdx); + tMapDataClear(&pReader->mBlock); + tBlockDataClear(&pReader->oBlockData, 1); + tBlockDataClear(&pReader->nBlockData, 1); + + if (pReader->pDelFReader) { + tsdbDelFReaderClose(&pReader->pDelFReader); + } + taosArrayDestroy(pReader->aDelIdx); + taosArrayDestroy(pReader->aDelData); + + tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode)); + taosMemoryFree(pReader); *ppReader = NULL; - return code; } int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; + *ppData = NULL; + // read data file if (!pReader->dataDone) { code = tsdbSnapReadData(pReader, ppData); if (code) { - if (code == TSDB_CODE_VND_READ_END) { - pReader->dataDone = 1; - } else { - goto _err; - } + goto _err; } else { - goto _exit; + if (*ppData) { + goto _exit; + } else { + pReader->dataDone = 1; + } } } @@ -283,23 +335,21 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { if (!pReader->delDone) { code = tsdbSnapReadDel(pReader, ppData); if (code) { - if (code == TSDB_CODE_VND_READ_END) { - pReader->delDone = 1; - } else { - goto _err; - } + goto _err; } else { - goto _exit; + if (*ppData) { + goto _exit; + } else { + pReader->delDone = 1; + } } } - code = TSDB_CODE_VND_READ_END; - _exit: return code; _err: - tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } @@ -315,74 +365,40 @@ struct STsdbSnapWriter { int32_t minRow; int32_t maxRow; int8_t cmprAlg; + int64_t commitID; // for data file + SBlockData bData; + int32_t fid; SDataFReader* pDataFReader; - SArray* aBlockIdx; + SArray* aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx* pBlockIdx; - SMapData mBlock; + SMapData mBlock; // SMapData int32_t iBlock; - SBlock* pBlock; - SBlock block; - SBlockData blockData; + SBlockData* pBlockData; int32_t iRow; + SBlockData bDataR; SDataFWriter* pDataFWriter; - SArray* aBlockIdxN; - SBlockIdx* pBlockIdxN; - SBlockIdx blockIdx; - SMapData mBlockN; - SBlock* pBlockN; - SBlock blockN; - SBlockData nBlockData; + SBlockIdx* pBlockIdxW; // NULL when no committing table + SBlock blockW; + SBlockData bDataW; + SBlockIdx blockIdxW; + + SMapData mBlockW; // SMapData + SArray* aBlockIdxW; // SArray // for del file SDelFReader* pDelFReader; SDelFWriter* pDelFWriter; int32_t iDelIdx; - SArray* aDelIdx; + SArray* aDelIdxR; SArray* aDelData; - SArray* aDelIdxN; + SArray* aDelIdxW; }; -static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - - if (pWriter->pDataFWriter == NULL) goto _exit; - - // TODO - - code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0); - if (code) goto _err; - - if (pWriter->pDataFReader) { - code = tsdbDataFReaderClose(&pWriter->pDataFReader); - if (code) goto _err; - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; int32_t iRow = 0; // todo @@ -390,7 +406,7 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, SBlockData* pBlockData = NULL; // todo while (iRow < nRow) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); if (code) goto _err; } @@ -401,160 +417,488 @@ _err: return code; } -static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) { +static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; - // TODO - return code; -} -static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - TABLEID id = {0}; // TODO + ASSERT(pWriter->pDataFWriter); - // skip - while (pWriter->pBlockIdx && tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) { - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; + if (pWriter->pBlockIdxW == NULL) goto _exit; - pWriter->iBlockIdx++; - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - } else { - pWriter->pBlockIdx = NULL; + // consume remain rows + if (pWriter->pBlockData) { + ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); + while (pWriter->iRow < pWriter->pBlockData->nRow) { + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); + if (code) goto _err; + + if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + pWriter->iRow++; } } - // new or merge - if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) { - int32_t c; - - if (pWriter->pBlockIdxN && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxN)) != 0)) { - ASSERT(c > 0); - - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - } - - if (pWriter->pBlockIdxN == NULL) { - pWriter->pBlockIdx = &pWriter->blockIdx; - pWriter->pBlockIdx->suid = id.suid; - pWriter->pBlockIdx->uid = id.uid; - } - - // loop to write the data - TSDBROW* pRow = NULL; // todo - int32_t nRow = 0; // todo - SBlockData* pBlockData = NULL; // todo - for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); - if (code) goto _err; - - if (pWriter->nBlockData.nRow > pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN, - pWriter->pBlockN, pWriter->cmprAlg); - if (code) goto _err; + // write remain data if has + if (pWriter->bDataW.nRow > 0) { + pWriter->blockW.last = 0; + if (pWriter->bDataW.nRow < pWriter->minRow) { + if (pWriter->iBlock > pWriter->mBlock.nItem) { + pWriter->blockW.last = 1; } } - } else { - // skip - while (true) { - if (pWriter->pBlock == NULL) break; - if (pWriter->pBlock->last) break; - if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break; - code = tMapDataPutItem(&pWriter->mBlockN, pWriter->pBlock, tPutBlock); + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + } + + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + + SBlock block; + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; + + tBlockReset(&block); + block.last = 1; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block, + pWriter->cmprAlg); if (code) goto _err; } - if (pWriter->pBlock) { - if (pWriter->pBlock->last) { - // load the last block and merge with the data (todo) - } else { - int32_t c = tBlockCmprFn(&(SBlock){0 /*TODO*/}, pWriter->pBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; - if (c > 0) { - // commit until pWriter->pBlock (todo) + pWriter->iBlock++; + } + + // SBlock + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); + if (code) goto _err; + + // SBlockIdx + if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { + int32_t code = 0; + + code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; + + // SBlockData + SBlock block; + tMapDataReset(&pWriter->mBlockW); + for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); + + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; + + tBlockReset(&block); + block.last = 1; + code = + tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg); + if (code) goto _err; + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + } + + // SBlock + SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + if (code) goto _err; + + // SBlockIdx + if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { + int32_t code = 0; + SBlockData* pBlockData = &pWriter->bData; + int32_t iRow = 0; + TSDBROW row; + TSDBROW* pRow = &row; + + // correct schema + code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); + if (code) goto _err; + + // loop to merge + *pRow = tsdbRowFromBlockData(pBlockData, iRow); + while (true) { + if (pRow == NULL) break; + + if (pWriter->pBlockData) { + ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); + + int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); + + ASSERT(c); + + if (c < 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; + + iRow++; + if (iRow < pWriter->pBlockData->nRow) { + *pRow = tsdbRowFromBlockData(pBlockData, iRow); } else { - // load the block and merge with the data (todo) + pRow = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); + if (code) goto _err; + + pWriter->iRow++; + if (pWriter->iRow >= pWriter->pBlockData->nRow) { + pWriter->pBlockData = NULL; } } } else { - int32_t nRow = 0; - SBlockData* pBlockData = NULL; + TSDBKEY key = TSDBROW_KEY(pRow); - for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); - if (code) goto _err; + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - if (pWriter->nBlockData.nRow >= pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN, - pWriter->pBlockN, pWriter->cmprAlg); + SBlock block; + int32_t c; + + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + + if (block.last) { + pWriter->pBlockData = &pWriter->bDataR; + + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); + if (code) goto _err; + pWriter->iRow = 0; + + pWriter->iBlock++; + break; + } + + c = tsdbKeyCmprFn(&block.maxKey, &key); + + ASSERT(c); + + if (c < 0) { + if (pWriter->bDataW.nRow) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); if (code) goto _err; - tBlockDataClearData(&pWriter->nBlockData); + pWriter->iBlock++; + } else { + c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); + + ASSERT(c); + + if (c > 0) { + pWriter->pBlockData = &pWriter->bDataR; + code = + tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); + if (code) goto _err; + pWriter->iRow = 0; + + pWriter->iBlock++; + } + break; } } + + if (pWriter->pBlockData) continue; + + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; + + iRow++; + if (iRow < pBlockData->nRow) { + *pRow = tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } } + + _check_write: + if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; + + _write_block: + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, + pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); } return code; _err: - tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), + tstrerror(code)); + return code; +} + +static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { + int32_t code = 0; + SBlockData* pBlockData = &pWriter->bData; + TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); + TSDBKEY keyLast = tBlockDataLastKey(pBlockData); + + // end last table write if should + if (pWriter->pBlockIdxW) { + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + if (c < 0) { + // end + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + + // reset + pWriter->pBlockIdxW = NULL; + } else if (c > 0) { + ASSERT(0); + } + } + + // start new table data write if need + if (pWriter->pBlockIdxW == NULL) { + // write table data ahead + while (true) { + if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + if (c >= 0) break; + + code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); + if (code) goto _err; + + pWriter->iBlockIdx++; + } + + // reader + pWriter->pBlockIdx = NULL; + if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + ASSERT(pWriter->pDataFReader); + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + ASSERT(c >= 0); + + if (c == 0) { + pWriter->pBlockIdx = pBlockIdx; + pWriter->iBlockIdx++; + } + } + + if (pWriter->pBlockIdx) { + code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; + } else { + tMapDataReset(&pWriter->mBlock); + } + pWriter->iBlock = 0; + pWriter->pBlockData = NULL; + pWriter->iRow = 0; + + // writer + pWriter->pBlockIdxW = &pWriter->blockIdxW; + pWriter->pBlockIdxW->suid = id.suid; + pWriter->pBlockIdxW->uid = id.uid; + + tBlockReset(&pWriter->blockW); + tBlockDataReset(&pWriter->bDataW); + tMapDataReset(&pWriter->mBlockW); + } + + ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); + ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); + + code = tsdbSnapWriteTableDataImpl(pWriter); + if (code) goto _err; + +_exit: + return code; + +_err: + tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), + tstrerror(code)); + return code; +} + +static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + + if (pWriter->pDataFWriter == NULL) goto _exit; + + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + + while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx)); + if (code) goto _err; + + pWriter->iBlockIdx++; + } + + code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); + if (code) goto _err; + + code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); + if (code) goto _err; + + code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + if (code) goto _err; + + if (pWriter->pDataFReader) { + code = tsdbDataFReaderClose(&pWriter->pDataFReader); + if (code) goto _err; + } + +_exit: + tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); + return code; + +_err: + tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; - int64_t skey; // todo - int64_t ekey; // todo + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); + int64_t n; - int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision); - ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision)); + // decode + SBlockData* pBlockData = &pWriter->bData; + n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData); + ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData); - // begin + // open file + TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); + TSDBKEY keyLast = tBlockDataLastKey(pBlockData); + + int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); + ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { + // end last file data write if need code = tsdbSnapWriteDataEnd(pWriter); if (code) goto _err; pWriter->fid = fid; - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid); - // reader + + // read + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); if (pSet) { - // open code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; - // SBlockIdx code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL); if (code) goto _err; } else { + ASSERT(pWriter->pDataFReader == NULL); taosArrayClear(pWriter->aBlockIdx); } pWriter->iBlockIdx = 0; + pWriter->pBlockIdx = NULL; + tMapDataReset(&pWriter->mBlock); + pWriter->iBlock = 0; + pWriter->pBlockData = NULL; + pWriter->iRow = 0; + tBlockDataReset(&pWriter->bDataR); - // writer - SDFileSet wSet = {0}; - if (pSet == NULL) { - wSet = (SDFileSet){0}; // todo + // write + SDFileSet wSet; + + if (pSet) { + wSet = (SDFileSet){.diskId = pSet->diskId, + .fid = fid, + .fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0}, + .fData = pSet->fData, + .fLast = {.commitID = pWriter->commitID, .size = 0}, + .fSma = pSet->fSma}; } else { - wSet = (SDFileSet){0}; // todo + wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0}, + .fid = fid, + .fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0}, + .fData = {.commitID = pWriter->commitID, .size = 0}, + .fLast = {.commitID = pWriter->commitID, .size = 0}, + .fSma = {.commitID = pWriter->commitID, .size = 0}}; } code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); if (code) goto _err; - taosArrayClear(pWriter->aBlockIdxN); + taosArrayClear(pWriter->aBlockIdxW); + tMapDataReset(&pWriter->mBlockW); + pWriter->pBlockIdxW = NULL; + tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteTableData(pWriter, pData, nData); + code = tsdbSnapWriteTableData(pWriter, id); if (code) goto _err; + tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", + TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); return code; _err: - tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -570,28 +914,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL); + code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL); if (code) goto _err; } // writer - SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0}; + SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); if (code) goto _err; } // process the del data - TABLEID id = {0}; // todo + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); while (true) { SDelIdx* pDelIdx = NULL; - int64_t n = 0; + int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); SDelData delData; SDelIdx delIdx; int8_t toBreak = 0; - if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) { - pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx); + if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) { + pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); } if (pDelIdx) { @@ -632,7 +976,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx); if (code) goto _err; - if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) { + if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -644,7 +988,7 @@ _exit: return code; _err: - tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -653,8 +997,9 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { STsdb* pTsdb = pWriter->pTsdb; if (pWriter->pDelFWriter == NULL) goto _exit; - for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) { - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx); + + for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) { + SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL); if (code) goto _err; @@ -663,7 +1008,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx); if (code) goto _err; - if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) { + if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -684,10 +1029,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { } _exit: + tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode)); return code; _err: - tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -705,6 +1051,54 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->sver = sver; pWriter->ever = ever; + // config + pWriter->minutes = pTsdb->keepCfg.days; + pWriter->precision = pTsdb->keepCfg.precision; + pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; + pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + pWriter->commitID = pTsdb->pVnode->state.commitID; + + // for data file + code = tBlockDataInit(&pWriter->bData); + + if (code) goto _err; + pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataInit(&pWriter->bDataR); + if (code) goto _err; + + pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->aBlockIdxW == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataInit(&pWriter->bDataW); + if (code) goto _err; + + // for del file + pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); + if (pWriter->aDelIdxR == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->aDelData = taosArrayInit(0, sizeof(SDelData)); + if (pWriter->aDelData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx)); + if (pWriter->aDelIdxW == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + code = tsdbFSBegin(pTsdb->fs); + if (code) goto _err; + *ppWriter = pWriter; return code; @@ -719,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { STsdbSnapWriter* pWriter = *ppWriter; if (rollback) { - code = tsdbSnapRollback(pWriter); + code = tsdbFSRollback(pWriter->pTsdb->fs); if (code) goto _err; } else { code = tsdbSnapWriteDataEnd(pWriter); @@ -728,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; - code = tsdbSnapCommit(pWriter); + code = tsdbFSCommit(pWriter->pTsdb->fs); if (code) goto _err; } @@ -738,29 +1132,35 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { return code; _err: - tsdbError("vgId:%d tsdb snapshot writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), + tstrerror(code)); return code; } int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - int8_t type = pData[0]; + int32_t code = 0; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; // ts data - if (type == 0) { - code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1); + if (pHdr->type == 1) { + code = tsdbSnapWriteData(pWriter, pData, nData); if (code) goto _err; + + goto _exit; } else { - code = tsdbSnapWriteDataEnd(pWriter); - if (code) goto _err; + if (pWriter->pDataFWriter) { + code = tsdbSnapWriteDataEnd(pWriter); + if (code) goto _err; + } } // del data - if (type == 1) { - code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); + if (pHdr->type == 2) { + code = tsdbSnapWriteDel(pWriter, pData, nData); if (code) goto _err; } +_exit: return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index dc6c823368..f318c69c6f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u // alloc code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); - if (code) goto _err; + if (code) goto _exit; code = tRealloc(&pMapData->pData, pMapData->nData); - if (code) goto _err; + if (code) goto _exit; // put pMapData->aOffset[nItem] = offset; tPutItemFn(pMapData->pData + offset, pItem); -_err: +_exit: return code; } @@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) { } // SBlockIdx ====================================================== -void tBlockIdxReset(SBlockIdx *pBlockIdx) { - pBlockIdx->minKey = TSKEY_MAX; - pBlockIdx->maxKey = TSKEY_MIN; - pBlockIdx->minVersion = VERSION_MAX; - pBlockIdx->maxVersion = VERSION_MIN; - pBlockIdx->offset = -1; - pBlockIdx->size = -1; -} - int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; SBlockIdx *pBlockIdx = (SBlockIdx *)ph; n += tPutI64(p ? p + n : p, pBlockIdx->suid); n += tPutI64(p ? p + n : p, pBlockIdx->uid); - n += tPutI64(p ? p + n : p, pBlockIdx->minKey); - n += tPutI64(p ? p + n : p, pBlockIdx->maxKey); - n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion); - n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion); n += tPutI64v(p ? p + n : p, pBlockIdx->offset); n += tPutI64v(p ? p + n : p, pBlockIdx->size); @@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { n += tGetI64(p + n, &pBlockIdx->suid); n += tGetI64(p + n, &pBlockIdx->uid); - n += tGetI64(p + n, &pBlockIdx->minKey); - n += tGetI64(p + n, &pBlockIdx->maxKey); - n += tGetI64v(p + n, &pBlockIdx->minVersion); - n += tGetI64v(p + n, &pBlockIdx->maxVersion); n += tGetI64v(p + n, &pBlockIdx->offset); n += tGetI64v(p + n, &pBlockIdx->size); @@ -921,6 +904,76 @@ _exit: return code; } +int32_t tPutColData(uint8_t *p, SColData *pColData) { + int32_t n = 0; + + n += tPutI16v(p ? p + n : p, pColData->cid); + n += tPutI8(p ? p + n : p, pColData->type); + n += tPutI8(p ? p + n : p, pColData->smaOn); + n += tPutI32v(p ? p + n : p, pColData->nVal); + n += tPutU8(p ? p + n : p, pColData->flag); + + if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit; + if (pColData->flag != HAS_VALUE) { + // bitmap + + int32_t size = BIT2_SIZE(pColData->nVal); + if (p) { + memcpy(p + n, pColData->pBitMap, size); + } + n += size; + } + if (IS_VAR_DATA_TYPE(pColData->type)) { + // offset + + int32_t size = sizeof(int32_t) * pColData->nVal; + if (p) { + memcpy(p + n, pColData->aOffset, size); + } + n += size; + } + n += tPutI32v(p ? p + n : p, pColData->nData); + if (p) { + memcpy(p + n, pColData->pData, pColData->nData); + } + n += pColData->nData; + +_exit: + return n; +} + +int32_t tGetColData(uint8_t *p, SColData *pColData) { + int32_t n = 0; + + n += tGetI16v(p + n, &pColData->cid); + n += tGetI8(p + n, &pColData->type); + n += tGetI8(p + n, &pColData->smaOn); + n += tGetI32v(p + n, &pColData->nVal); + n += tGetU8(p + n, &pColData->flag); + + if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit; + if (pColData->flag != HAS_VALUE) { + // bitmap + + int32_t size = BIT2_SIZE(pColData->nVal); + pColData->pBitMap = p + n; + n += size; + } + if (IS_VAR_DATA_TYPE(pColData->type)) { + // offset + + int32_t size = sizeof(int32_t) * pColData->nVal; + pColData->aOffset = (int32_t *)(p + n); + n += size; + } + n += tGetI32v(p + n, &pColData->nData); + pColData->pData = p + n; + n += pColData->nData; + +_exit: + return n; +} + static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { SColData *pColData1 = (SColData *)p1; SColData *pColData2 = (SColData *)p2; @@ -962,11 +1015,11 @@ void tBlockDataReset(SBlockData *pBlockData) { taosArrayClear(pBlockData->aIdx); } -void tBlockDataClear(SBlockData *pBlockData) { +void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); taosArrayDestroy(pBlockData->aIdx); - taosArrayDestroyEx(pBlockData->aColData, tColDataClear); + taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); } int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { @@ -1079,6 +1132,46 @@ _err: return code; } +int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) { + int32_t code = 0; + + int32_t iColData = 0; + for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) { + SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom); + + while (true) { + SColData *pColData; + if (iColData < taosArrayGetSize(pBlockData->aIdx)) { + pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + } else { + pColData = NULL; + } + + if (pColData == NULL || pColData->cid > pColDataFrom->cid) { + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn); + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } + + iColData++; + break; + } else if (pColData->cid == pColDataFrom->cid) { + iColData++; + break; + } else { + iColData++; + } + } + } + +_exit: + return code; +} + int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { int32_t code = 0; @@ -1239,6 +1332,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD *ppColData = NULL; } +int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) { + int32_t n = 0; + + n += tPutI32v(p ? p + n : p, pBlockData->nRow); + if (p) { + memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow); + } + n = n + sizeof(int64_t) * pBlockData->nRow; + if (p) { + memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow); + } + n = n + sizeof(TSKEY) * pBlockData->nRow; + + int32_t nCol = taosArrayGetSize(pBlockData->aIdx); + n += tPutI32v(p ? p + n : p, nCol); + for (int32_t iCol = 0; iCol < nCol; iCol++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + n += tPutColData(p ? p + n : p, pColData); + } + + return n; +} + +int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) { + int32_t n = 0; + + tBlockDataReset(pBlockData); + + n += tGetI32v(p + n, &pBlockData->nRow); + pBlockData->aVersion = (int64_t *)(p + n); + n = n + sizeof(int64_t) * pBlockData->nRow; + pBlockData->aTSKEY = (TSKEY *)(p + n); + n = n + sizeof(TSKEY) * pBlockData->nRow; + + int32_t nCol; + n += tGetI32v(p + n, &nCol); + for (int32_t iCol = 0; iCol < nCol; iCol++) { + SColData *pColData; + + if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1; + n += tGetColData(p + n, pColData); + } + + return n; +} + // ALGORITHM ============================== void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 27f30ec787..c42c080fb8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -20,13 +20,13 @@ struct SVSnapReader { SVnode *pVnode; int64_t sver; int64_t ever; + int64_t index; // meta int8_t metaDone; SMetaSnapReader *pMetaReader; // tsdb int8_t tsdbDone; STsdbSnapReader *pTsdbReader; - uint8_t *pData; }; int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { @@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe pReader->sver = sver; pReader->ever = ever; - code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader); - if (code) goto _err; - - code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader); - if (code) goto _err; - + vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever); *ppReader = pReader; return code; @@ -60,54 +55,85 @@ _err: int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t code = 0; - tFree(pReader->pData); - if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader); - if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader); - taosMemoryFree(pReader); + if (pReader->pTsdbReader) { + tsdbSnapReaderClose(&pReader->pTsdbReader); + } + if (pReader->pMetaReader) { + metaSnapReaderClose(&pReader->pMetaReader); + } + + vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode)); + taosMemoryFree(pReader); return code; } int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; + // META ============== if (!pReader->metaDone) { - code = metaSnapRead(pReader->pMetaReader, &pReader->pData); + // open reader if not + if (pReader->pMetaReader == NULL) { + code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader); + if (code) goto _err; + } + + code = metaSnapRead(pReader->pMetaReader, ppData); if (code) { - if (code == TSDB_CODE_VND_READ_END) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { pReader->metaDone = 1; - } else { - goto _err; + code = metaSnapReaderClose(&pReader->pMetaReader); + if (code) goto _err; } - } else { - *ppData = pReader->pData; - *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size; - goto _exit; } } + // TSDB ============== if (!pReader->tsdbDone) { - code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData); + // open if not + if (pReader->pTsdbReader == NULL) { + code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader); + if (code) goto _err; + } + + code = tsdbSnapRead(pReader->pTsdbReader, ppData); if (code) { - if (code == TSDB_CODE_VND_READ_END) { - pReader->tsdbDone = 1; - } else { - goto _err; - } + goto _err; } else { - *ppData = pReader->pData; - *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size; - goto _exit; + if (*ppData) { + goto _exit; + } else { + pReader->tsdbDone = 1; + code = tsdbSnapReaderClose(&pReader->pTsdbReader); + if (code) goto _err; + } } } - code = TSDB_CODE_VND_READ_END; + *ppData = NULL; + *nData = 0; _exit: + if (*ppData) { + SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData); + + pReader->index++; + *nData = sizeof(SSnapDataHdr) + pHdr->size; + pHdr->index = pReader->index; + vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode), + pReader->index, pHdr->type, *nData); + } else { + vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index); + } return code; _err: - vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); + vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); return code; } @@ -116,24 +142,13 @@ struct SVSnapWriter { SVnode *pVnode; int64_t sver; int64_t ever; + int64_t index; // meta SMetaSnapWriter *pMetaSnapWriter; // tsdb STsdbSnapWriter *pTsdbSnapWriter; }; -static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) { - int32_t code = 0; - // TODO - return code; -} - int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t code = 0; SVSnapWriter *pWriter = NULL; @@ -148,62 +163,78 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->sver = sver; pWriter->ever = ever; + vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode)); + *ppWriter = pWriter; return code; _err: vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); + *ppWriter = NULL; return code; } int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t code = 0; - if (rollback) { - code = vnodeSnapRollback(pWriter); - if (code) goto _err; - } else { - code = vnodeSnapCommit(pWriter); + if (pWriter->pMetaSnapWriter) { + code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback); if (code) goto _err; } + if (pWriter->pTsdbSnapWriter) { + code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); + if (code) goto _err; + } + +_exit: + vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback); taosMemoryFree(pWriter); return code; _err: - vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); + vError("vgId:%d vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); return code; } int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t code = 0; - SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData; + SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SVnode *pVnode = pWriter->pVnode; - ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData); + ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); + ASSERT(pHdr->index == pWriter->index + 1); + pWriter->index = pHdr->index; - if (pSnapDataHdr->type == 0) { + vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, + pHdr->type, nData); + + if (pHdr->type == 0) { // meta + if (pWriter->pMetaSnapWriter == NULL) { code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); if (code) goto _err; } - code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); if (code) goto _err; } else { // tsdb + if (pWriter->pTsdbSnapWriter == NULL) { code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); if (code) goto _err; } - code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); if (code) goto _err; } +_exit: return code; _err: - vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code)); + vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), + tstrerror(code), pHdr->index, pHdr->type, nData); return code; } \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a81a796fa7..8222e2ef40 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -521,8 +521,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); - if (code != 0) { - // TODO + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + return NULL; } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 31776610b6..10e0808c4d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2369,6 +2369,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, .processFunc = hllFunction, + .sprocessFunc = hllScalarFunction, .finalizeFunc = hllFinalize, .invertFunc = NULL, .combineFunc = hllCombine, @@ -2407,6 +2408,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, .processFunc = diffFunction, + .sprocessFunc = diffScalarFunction, .finalizeFunc = functionFinalize }, { @@ -2437,6 +2439,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCsumFuncEnv, .initFunc = functionSetup, .processFunc = csumFunction, + .sprocessFunc = csumScalarFunction, .finalizeFunc = NULL }, { @@ -2463,7 +2466,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "tail", .type = FUNCTION_TYPE_TAIL, - .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateTail, .getEnvFunc = getTailFuncEnv, @@ -2474,7 +2477,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 12b796c5ca..a9a569480d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2723,6 +2723,9 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. + // we will use this opt implementation in an new version that is only available in scan subplan +#if 0 if (blockDataOrder == TSDB_ORDER_ASC) { // filter according to current result firstly if (pResInfo->numOfRes > 0) { @@ -2770,6 +2773,22 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } } +#else + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { + doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + pResInfo->numOfRes = 1; + } + } +#endif SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; @@ -2801,6 +2820,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. +#if 0 if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { @@ -2833,6 +2854,22 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { break; } } +#else + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveCurrentVal(pCtx, i, cts, type, data); + pResInfo->numOfRes = 1; + } + } +#endif SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; @@ -2988,6 +3025,9 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; +#if 0 + // the optimized version only function if all tuples in one block are monotonious increasing or descreasing. + // this is NOT always works if project operator exists in downstream. if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { char* data = colDataGetData(pInputCol, i); @@ -2997,6 +3037,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { doSaveLastrow(pCtx, data, i, cts, pInfo); } + break; } } else { // descending order @@ -3011,7 +3052,19 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { break; } } +#else + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + numOfElems++; + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveLastrow(pCtx, data, i, cts, pInfo); + pResInfo->numOfRes = 1; + } + } + +#endif SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -5926,6 +5979,7 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { doSaveLastrow(pCtx, data, i, cts, pInfo); + pResInfo->numOfRes = 1; } } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 9687528136..3f26cd46f8 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1746,20 +1746,14 @@ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam SColumnInfoData *pOutputData = pOutput->columnData; int64_t *out = (int64_t *)pOutputData->pData; - bool hasNull = false; *out = 0; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { - hasNull = true; - break; + continue; } (*out)++; } - if (hasNull) { - colDataAppendNULL(pOutputData, 0); - } - pOutput->numOfRows = 1; return TSDB_CODE_SUCCESS; } @@ -2414,6 +2408,10 @@ int32_t irateScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam return nonCalcScalarFunction(pInput, inputNum, pOutput); } +int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return nonCalcScalarFunction(pInput, inputNum, pOutput); +} + int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return avgScalarFunction(pInput, inputNum, pOutput); } @@ -2421,3 +2419,11 @@ int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return avgScalarFunction(pInput, inputNum, pOutput); } + +int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return countScalarFunction(pInput, inputNum, pOutput); +} + +int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return sumScalarFunction(pInput, inputNum, pOutput); +} diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 1f26033f63..0cbd7d36b2 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); // snapshot -------------- bool syncNodeHasSnapshot(SSyncNode* pSyncNode); +void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index dca80f6826..9678b335fd 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc syncNodeEventLog(ths, logBuf); } while (0); + // maybe update commit index by snapshot + syncNodeMaybeUpdateCommitBySnapshot(ths); + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; @@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->term = ths->pRaftStore->currentTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->matchIndex = ths->commitIndex; // msg event log do { diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index c18c2b4d38..5137922522 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; + // print log + do { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term, + pMsg->matchIndex, pMsg->success); + syncNodeEventLog(ths, logBuf); + + } while (0); + // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); @@ -238,7 +247,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie SSnapshot oldSnapshot; ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm; - syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg); + + SyncIndex endIndex; + if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex + 1)) { + endIndex = nextIndex; + } else { + endIndex = oldSnapshot.lastApplyIndex; + } + syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, endIndex, newSnapshotTerm, pMsg); // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); @@ -256,6 +272,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->matchIndex > oldMatchIndex) { + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + } + // event log, update next-index do { char host[64]; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b6c01f2923..bb7454ea6f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { return pSyncNode; } +void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { + if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + SSnapshot snapshot; + int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + ASSERT(code == 0); + if (snapshot.lastApplyIndex > pSyncNode->commitIndex) { + pSyncNode->commitIndex = snapshot.lastApplyIndex; + } + } +} + void syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index cc5c9eb651..b323887d93 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -336,8 +336,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end") - // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 585594e035..4961355f06 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -79,22 +79,39 @@ class TDSql: self.queryResult = None tdLog.info("sql:%s, expect error occured" % (sql)) - def query(self, sql, row_tag=None): + def query(self, sql, row_tag=None,queyTimes=10): self.sql = sql - try: - self.cursor.execute(sql) - self.queryResult = self.cursor.fetchall() - self.queryRows = len(self.queryResult) - self.queryCols = len(self.cursor.description) - except Exception as e: - caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, sql, repr(e)) - tdLog.notice("%s(%d) failed: sql:%s, %s" % args) - traceback.print_exc() - raise Exception(repr(e)) - if row_tag: - return self.queryResult - return self.queryRows + i=1 + while i <= queyTimes: + try: + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) + if row_tag: + return self.queryResult + return self.queryRows + except Exception as e: + i+=1 + tdLog.notice("Try to query again, query times: %d "%i) + pass + else: + try: + tdLog.notice("Try the last query ") + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) + if row_tag: + return self.queryResult + return self.queryRows + except Exception as e: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, repr(e)) + tdLog.notice("%s(%d) failed: sql:%s, %s" % args) + traceback.print_exc() + raise Exception(repr(e)) + def is_err_sql(self, sql): err_flag = True @@ -266,16 +283,27 @@ class TDSql: time.sleep(1) continue - def execute(self, sql): + def execute(self, sql,queyTimes=10): self.sql = sql - try: - self.affectedRows = self.cursor.execute(sql) - except Exception as e: - caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, sql, repr(e)) - tdLog.notice("%s(%d) failed: sql:%s, %s" % args) - raise Exception(repr(e)) - return self.affectedRows + i=1 + while i <= queyTimes: + try: + self.affectedRows = self.cursor.execute(sql) + return self.affectedRows + except Exception as e: + i+=1 + tdLog.notice("Try to execute sql again, query times: %d "%i) + pass + else: + try: + tdLog.notice("Try the last execute sql ") + self.affectedRows = self.cursor.execute(sql) + return self.affectedRows + except Exception as e: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, repr(e)) + tdLog.notice("%s(%d) failed: sql:%s, %s" % args) + raise Exception(repr(e)) def checkAffectedRows(self, expectAffectedRows): if self.affectedRows != expectAffectedRows: diff --git a/tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim b/tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim index d84679add1..960f579f55 100644 --- a/tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim +++ b/tests/script/tsim/sync/3Replica5VgElect3mnodedrop.sim @@ -91,21 +91,21 @@ if $rows != $vgroups then return -1 endi -if $data[0][4] == LEADER then - if $data[0][6] == FOLLOWER then - if $data[0][8] == FOLLOWER then +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] endi endi -elif $data[0][6] == LEADER then - if $data[0][4] == FOLLOWER then - if $data[0][8] == FOLLOWER then +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] endi endi -elif $data[0][8] == LEADER then - if $data[0][4] == FOLLOWER then - if $data[0][6] == FOLLOWER then +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] endi endi @@ -113,21 +113,21 @@ else goto check_vg_ready endi -if $data[1][4] == LEADER then - if $data[1][6] == FOLLOWER then - if $data[1][8] == FOLLOWER then +if $data[1][4] == leader then + if $data[1][6] == follower then + if $data[1][8] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] endi endi -elif $data[1][6] == LEADER then - if $data[1][4] == FOLLOWER then - if $data[1][8] == FOLLOWER then +elif $data[1][6] == leader then + if $data[1][4] == follower then + if $data[1][8] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] endi endi -elif $data[1][8] == LEADER then - if $data[1][4] == FOLLOWER then - if $data[1][6] == FOLLOWER then +elif $data[1][8] == leader then + if $data[1][4] == follower then + if $data[1][6] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] endi endi @@ -135,21 +135,21 @@ else goto check_vg_ready endi -if $data[2][4] == LEADER then - if $data[2][6] == FOLLOWER then - if $data[2][8] == FOLLOWER then +if $data[2][4] == leader then + if $data[2][6] == follower then + if $data[2][8] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] endi endi -elif $data[2][6] == LEADER then - if $data[2][4] == FOLLOWER then - if $data[2][8] == FOLLOWER then +elif $data[2][6] == leader then + if $data[2][4] == follower then + if $data[2][8] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] endi endi -elif $data[2][8] == LEADER then - if $data[2][4] == FOLLOWER then - if $data[2][6] == FOLLOWER then +elif $data[2][8] == leader then + if $data[2][4] == follower then + if $data[2][6] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] endi endi @@ -157,21 +157,21 @@ else goto check_vg_ready endi -if $data[3][4] == LEADER then - if $data[3][6] == FOLLOWER then - if $data[3][8] == FOLLOWER then +if $data[3][4] == leader then + if $data[3][6] == follower then + if $data[3][8] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] endi endi -elif $data[3][6] == LEADER then - if $data[3][4] == FOLLOWER then - if $data[3][8] == FOLLOWER then +elif $data[3][6] == leader then + if $data[3][4] == follower then + if $data[3][8] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] endi endi -elif $data[3][8] == LEADER then - if $data[3][4] == FOLLOWER then - if $data[3][6] == FOLLOWER then +elif $data[3][8] == leader then + if $data[3][4] == follower then + if $data[3][6] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] endi endi @@ -179,21 +179,21 @@ else goto check_vg_ready endi -if $data[4][4] == LEADER then - if $data[4][6] == FOLLOWER then - if $data[4][8] == FOLLOWER then +if $data[4][4] == leader then + if $data[4][6] == follower then + if $data[4][8] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] endi endi -elif $data[4][6] == LEADER then - if $data[4][4] == FOLLOWER then - if $data[4][8] == FOLLOWER then +elif $data[4][6] == leader then + if $data[4][4] == follower then + if $data[4][8] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] endi endi -elif $data[4][8] == LEADER then - if $data[4][4] == FOLLOWER then - if $data[4][6] == FOLLOWER then +elif $data[4][8] == leader then + if $data[4][4] == follower then + if $data[4][6] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] endi endi @@ -286,13 +286,13 @@ if $data[0][0] != 1 then return -1 endi -if $data[0][2] != LEADER then +if $data[0][2] != leader then goto check_mnode_ready_2 endi -if $data[1][2] != FOLLOWER then +if $data[1][2] != follower then goto check_mnode_ready_2 endi -if $data[2][2] != FOLLOWER then +if $data[2][2] != follower then goto check_mnode_ready_2 endi @@ -318,21 +318,21 @@ if $rows != $vgroups then return -1 endi -if $data[0][4] == LEADER then - if $data[0][6] == FOLLOWER then - if $data[0][8] == FOLLOWER then +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] endi endi -elif $data[0][6] == LEADER then - if $data[0][4] == FOLLOWER then - if $data[0][8] == FOLLOWER then +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] endi endi -elif $data[0][8] == LEADER then - if $data[0][4] == FOLLOWER then - if $data[0][6] == FOLLOWER then +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] endi endi @@ -340,21 +340,21 @@ else goto check_vg_ready1 endi -if $data[1][4] == LEADER then - if $data[1][6] == FOLLOWER then - if $data[1][8] == FOLLOWER then +if $data[1][4] == leader then + if $data[1][6] == follower then + if $data[1][8] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][3] endi endi -elif $data[1][6] == LEADER then - if $data[1][4] == FOLLOWER then - if $data[1][8] == FOLLOWER then +elif $data[1][6] == leader then + if $data[1][4] == follower then + if $data[1][8] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][5] endi endi -elif $data[1][8] == LEADER then - if $data[1][4] == FOLLOWER then - if $data[1][6] == FOLLOWER then +elif $data[1][8] == leader then + if $data[1][4] == follower then + if $data[1][6] == follower then print ---- vgroup $data[1][0] leader locate on dnode $data[1][7] endi endi @@ -362,21 +362,21 @@ else goto check_vg_ready1 endi -if $data[2][4] == LEADER then - if $data[2][6] == FOLLOWER then - if $data[2][8] == FOLLOWER then +if $data[2][4] == leader then + if $data[2][6] == follower then + if $data[2][8] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][3] endi endi -elif $data[2][6] == LEADER then - if $data[2][4] == FOLLOWER then - if $data[2][8] == FOLLOWER then +elif $data[2][6] == leader then + if $data[2][4] == follower then + if $data[2][8] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][5] endi endi -elif $data[2][8] == LEADER then - if $data[2][4] == FOLLOWER then - if $data[2][6] == FOLLOWER then +elif $data[2][8] == leader then + if $data[2][4] == follower then + if $data[2][6] == follower then print ---- vgroup $data[2][0] leader locate on dnode $data[2][7] endi endi @@ -384,21 +384,21 @@ else goto check_vg_ready1 endi -if $data[3][4] == LEADER then - if $data[3][6] == FOLLOWER then - if $data[3][8] == FOLLOWER then +if $data[3][4] == leader then + if $data[3][6] == follower then + if $data[3][8] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][3] endi endi -elif $data[3][6] == LEADER then - if $data[3][4] == FOLLOWER then - if $data[3][8] == FOLLOWER then +elif $data[3][6] == leader then + if $data[3][4] == follower then + if $data[3][8] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][5] endi endi -elif $data[3][8] == LEADER then - if $data[3][4] == FOLLOWER then - if $data[3][6] == FOLLOWER then +elif $data[3][8] == leader then + if $data[3][4] == follower then + if $data[3][6] == follower then print ---- vgroup $data[3][0] leader locate on dnode $data[3][7] endi endi @@ -406,21 +406,21 @@ else goto check_vg_ready1 endi -if $data[4][4] == LEADER then - if $data[4][6] == FOLLOWER then - if $data[4][8] == FOLLOWER then +if $data[4][4] == leader then + if $data[4][6] == follower then + if $data[4][8] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][3] endi endi -elif $data[4][6] == LEADER then - if $data[4][4] == FOLLOWER then - if $data[4][8] == FOLLOWER then +elif $data[4][6] == leader then + if $data[4][4] == follower then + if $data[4][8] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][5] endi endi -elif $data[4][8] == LEADER then - if $data[4][4] == FOLLOWER then - if $data[4][6] == FOLLOWER then +elif $data[4][8] == leader then + if $data[4][4] == follower then + if $data[4][6] == follower then print ---- vgroup $data[4][0] leader locate on dnode $data[4][7] endi endi @@ -539,27 +539,27 @@ if $data[0][0] != 1 then return -1 endi -if $data[0][2] == LEADER then - if $data[1][2] != FOLLOWER then +if $data[0][2] == leader then + if $data[1][2] != follower then goto check_mnode_ready_3 endi - if $data[2][2] != FOLLOWER then + if $data[2][2] != follower then goto check_mnode_ready_3 endi endi -if $data[1][2] == LEADER then - if $data[0][2] != FOLLOWER then +if $data[1][2] == leader then + if $data[0][2] != follower then goto check_mnode_ready_3 endi - if $data[2][2] != FOLLOWER then + if $data[2][2] != follower then goto check_mnode_ready_3 endi endi -if $data[2][2] == LEADER then - if $data[1][2] != FOLLOWER then +if $data[2][2] == leader then + if $data[1][2] != follower then goto check_mnode_ready_3 endi - if $data[0][2] != FOLLOWER then + if $data[0][2] != follower then goto check_mnode_ready_3 endi endi diff --git a/tests/script/tsim/sync/vnodesnapshot-test.sim b/tests/script/tsim/sync/vnodesnapshot-test.sim index 8589078b50..2f0eccf02e 100644 --- a/tests/script/tsim/sync/vnodesnapshot-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-test.sim @@ -170,84 +170,3 @@ if $rows != 100 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT -system sh/exec.sh -n dnode3 -s stop -x SIGINT -system sh/exec.sh -n dnode4 -s stop -x SIGINT -######################################################## - - -######################################################## -print ===> start dnode1 dnode3 dnode4 -system sh/exec.sh -n dnode1 -s start -#system sh/exec.sh -n dnode2 -s start -system sh/exec.sh -n dnode3 -s start -system sh/exec.sh -n dnode4 -s start - -sleep 7000 - -print =============== query data -sql connect -sql use db -sql select * from ct1 -print rows: $rows -print $data00 $data01 $data02 -if $rows != 100 then - return -1 -endi - -system sh/exec.sh -n dnode1 -s stop -x SIGINT -#system sh/exec.sh -n dnode2 -s stop -x SIGINT -system sh/exec.sh -n dnode3 -s stop -x SIGINT -system sh/exec.sh -n dnode4 -s stop -x SIGINT -######################################################## - - -######################################################## -print ===> start dnode1 dnode2 dnode4 -system sh/exec.sh -n dnode1 -s start -system sh/exec.sh -n dnode2 -s start -#system sh/exec.sh -n dnode3 -s start -system sh/exec.sh -n dnode4 -s start - -sleep 3000 - -print =============== query data -sql select * from ct1 -print rows: $rows -print $data00 $data01 $data02 -if $rows != 100 then - return -1 -endi - -system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT -#system sh/exec.sh -n dnode3 -s stop -x SIGINT -system sh/exec.sh -n dnode4 -s stop -x SIGINT -######################################################## - - -######################################################## -print ===> start dnode1 dnode2 dnode3 -system sh/exec.sh -n dnode1 -s start -system sh/exec.sh -n dnode2 -s start -system sh/exec.sh -n dnode3 -s start -#system sh/exec.sh -n dnode4 -s start - -sleep 3000 - -print =============== query data -sql select * from ct1 -print rows: $rows -print $data00 $data01 $data02 -if $rows != 100 then - return -1 -endi - -system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT -system sh/exec.sh -n dnode3 -s stop -x SIGINT -#system sh/exec.sh -n dnode4 -s stop -x SIGINT -######################################################## - - diff --git a/tests/script/tsim/valgrind/basic3.sim b/tests/script/tsim/valgrind/basic3.sim index 7fba66c468..6a42a8eb7f 100644 --- a/tests/script/tsim/valgrind/basic3.sim +++ b/tests/script/tsim/valgrind/basic3.sim @@ -1,13 +1,10 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/deploy.sh -n dnode2 -i 2 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 system sh/exec.sh -n dnode1 -s start -v -system sh/exec.sh -n dnode2 -s start -v sql connect -print =============== add dnode2 into cluster -sql create dnode $hostname port 7200 - +print =============== step1: create drop show dnodes $x = 0 step1: $x = $x + 1 @@ -18,135 +15,59 @@ step1: endi sql show dnodes print ---> $data00 $data01 $data02 $data03 $data04 $data05 -print ---> $data10 $data11 $data12 $data13 $data14 $data15 -if $rows != 2 then +if $rows != 1 then return -1 endi if $data(1)[4] != ready then goto step1 endi -if $data(2)[4] != ready then - goto step1 -endi - -print =============== create database, stable, table -sql create database db vgroups 3 -sql use db -sql create table stb (ts timestamp, c int) tags (t int) -sql create table t0 using stb tags (0) -sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10)); - -print =============== run show xxxx -sql show dnodes -if $rows != 2 then - return -1 -endi - -sql show mnodes -if $rows != 1 then - return -1 -endi +print =============== step2: create db +sql create database d1 vgroups 2 buffer 3 sql show databases -if $rows != 3 then - return -1 -endi +sql use d1 +sql show vgroups +print =============== step3: create show stable +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) sql show stables if $rows != 1 then return -1 endi +print =============== step4: create show table +sql create table ct1 using stb tags(1000) +sql create table ct2 using stb tags(2000) +sql create table ct3 using stb tags(3000) sql show tables -if $rows != 2 then - return -1 -endi - -sql show users -if $rows != 1 then - return -1 -endi - -sql show vgroups if $rows != 3 then return -1 endi -print =============== run select * from information_schema.xxxx -sql select * from information_schema.`dnodes` -if $rows != 2 then - return -1 -endi +print =============== step5: insert data +sql insert into ct1 values(now+0d, 10, 2.0, 3.0) +sql insert into ct1 values(now+1d, 11, 2.1, 3.1)(now+2d, -12, -2.2, -3.2)(now+3d, -13, -2.3, -3.3) +sql insert into ct2 values(now+0d, 10, 2.0, 3.0) +sql insert into ct2 values(now+1d, 11, 2.1, 3.1)(now+2d, -12, -2.2, -3.2)(now+3d, -13, -2.3, -3.3) +sql insert into ct3 values('2022-01-01 00:00:00.000', 10, 2.0, 3.0) -sql select * from information_schema.`mnodes` -if $rows != 1 then - return -1 -endi +print =============== step6: query data +sql select * from ct1 where ts < now -1d and ts > now +1d +sql select * from stb where ts < now -1d and ts > now +1d +sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc +sql select * from stb where ts < now -1d and ts > now +1d order by ts desc -sql select * from information_schema.user_databases -if $rows != 3 then - return -1 -endi - -sql select * from information_schema.user_stables -if $rows != 1 then - return -1 -endi - -sql select * from information_schema.user_tables -if $rows != 31 then - return -1 -endi - -sql select * from information_schema.user_users -if $rows != 1 then - return -1 -endi - -sql select * from information_schema.`vgroups` -if $rows != 3 then - return -1 -endi - -sql show variables; -if $rows != 4 then - return -1 -endi - -sql show dnode 1 variables; -if $rows <= 0 then - return -1 -endi - -sql show local variables; -if $rows <= 0 then - return -1 -endi - -print ==== stop dnode1 and dnode2, and restart dnodes +_OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT +print =============== check +$null= -print =============== check dnode1 system_content sh/checkValgrind.sh -n dnode1 print cmd return result ----> [ $system_content ] -if $system_content <= 0 then - return 0 +if $system_content > 1 then + return -1 endi -$null= if $system_content == $null then - return 0 -endi - -print =============== check dnode2 -system_content sh/checkValgrind.sh -n dnode2 -print cmd return result ----> [ $system_content ] -if $system_content <= 0 then - return 0 -endi - -$null= -if $system_content == $null then - return 0 + return -1 endi diff --git a/tests/script/tsim/valgrind/checkError2.sim b/tests/script/tsim/valgrind/checkError2.sim index 3a82218015..9746fe9ecf 100644 --- a/tests/script/tsim/valgrind/checkError2.sim +++ b/tests/script/tsim/valgrind/checkError2.sim @@ -23,7 +23,7 @@ if $data(1)[4] != ready then endi print =============== step2: create db -sql create database d1 vgroups 1 buffer 3 +sql create database d1 vgroups 2 buffer 3 sql show databases sql use d1 sql show vgroups @@ -44,24 +44,32 @@ if $rows != 3 then return -1 endi -print =============== step5: insert data +print =============== step5: insert data (null / update) sql insert into ct1 values(now+0s, 10, 2.0, 3.0) -sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) +sql insert into ct1 values(now+1s, 11, 2.1, NULL)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct2 values(now+0s, 10, 2.0, 3.0) sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) -sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) +sql insert into ct3 values('2021-01-01 00:00:00.000', NULL, NULL, 3.0) +sql insert into ct3 values('2022-03-02 16:59:00.010', 3 , 4, 5), ('2022-03-02 16:59:00.010', 33 , 4, 5), ('2022-04-01 16:59:00.011', 4, 4, 5), ('2022-04-01 16:59:00.011', 6, 4, 5), ('2022-03-06 16:59:00.013', 8, 4, 5); +sql insert into ct3 values('2022-03-02 16:59:00.010', 103, 1, 2), ('2022-03-02 16:59:00.010', 303, 3, 4), ('2022-04-01 16:59:00.011', 40, 5, 6), ('2022-04-01 16:59:00.011', 60, 4, 5), ('2022-03-06 16:59:00.013', 80, 4, 5); print =============== step6: query data sql select * from ct1 sql select * from stb sql select c1, c2, c3 from ct1 sql select ts, c1, c2, c3 from stb +sql select * from ct1 where ts < now -1d and ts > now +1d +sql select * from stb where ts < now -1d and ts > now +1d +#sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc +#sql select * from stb where ts < now -1d and ts > now +1d order by ts desc print =============== step7: count sql select count(*) from ct1; sql select count(*) from stb; sql select count(ts), count(c1), count(c2), count(c3) from ct1 sql select count(ts), count(c1), count(c2), count(c3) from stb +sql select count(*) from ct1 where ts < now -1d and ts > now +1d +sql select count(*) from stb where ts < now -1d and ts > now +1d print =============== step8: func sql select first(ts), first(c1), first(c2), first(c3) from ct1 @@ -69,6 +77,20 @@ sql select min(c1), min(c2), min(c3) from ct1 sql select max(c1), max(c2), max(c3) from ct1 sql select sum(c1), sum(c2), sum(c3) from ct1 +print =============== step9: insert select +#sql create table ct4 using stb tags(4000); +#sql insert into ct4 select * from ct1; +#sql select * from ct4; +#sql insert into ct4 select ts,c1,c2,c3 from stb; + +#sql create table tb1 (ts timestamp, c1 int, c2 float, c3 double); +#sql insert into tb1 (ts, c1, c2, c3) select * from ct1; +#sql select * from tb1; + +#sql create table tb2 (ts timestamp, f1 binary(10), c1 int, c2 double); +#sql insert into tb2 (c2, c1, ts) select c2+1, c1, ts+3 from ct2; +#sql select * from tb2; + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check diff --git a/tests/system-test/2-query/db.py b/tests/system-test/2-query/db.py new file mode 100644 index 0000000000..a4d603bada --- /dev/null +++ b/tests/system-test/2-query/db.py @@ -0,0 +1,63 @@ +import taos +import sys +import datetime +import inspect + +from util.log import * +from util.sql import * +from util.cases import * +import random + + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143, + "jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143, + "wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "udfDebugFlag": 143} + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def case1(self): + tdSql.execute("create database if not exists dbms precision 'ms'") + tdSql.execute("create database if not exists dbus precision 'us'") + tdSql.execute("create database if not exists dbns precision 'ns'") + + tdSql.execute("create table dbms.ntb (ts timestamp, c1 int, c2 bigint)") + tdSql.execute("create table dbus.ntb (ts timestamp, c1 int, c2 bigint)") + tdSql.execute("create table dbns.ntb (ts timestamp, c1 int, c2 bigint)") + + tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.001', 1, 2)") + tdSql.execute("insert into dbms.ntb values ('2022-01-01 08:00:00.002', 3, 4)") + + tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000001', 1, 2)") + tdSql.execute("insert into dbus.ntb values ('2022-01-01 08:00:00.000002', 3, 4)") + + tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000001', 1, 2)") + tdSql.execute("insert into dbns.ntb values ('2022-01-01 08:00:00.000000002', 3, 4)") + + tdSql.query("select count(c1) from dbms.ntb interval(1a)") + tdSql.checkRows(2) + + tdSql.query("select count(c1) from dbus.ntb interval(1u)") + tdSql.checkRows(2) + + tdSql.query("select count(c1) from dbns.ntb interval(1b)") + tdSql.checkRows(2) + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + tdSql.prepare() + + tdLog.printNoPrefix("==========start case1 run ...............") + + self.case1() + + tdLog.printNoPrefix("==========end case1 run ...............") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/queryQnode.py b/tests/system-test/2-query/queryQnode.py index 8bdb99deec..b5e2bd3328 100644 --- a/tests/system-test/2-query/queryQnode.py +++ b/tests/system-test/2-query/queryQnode.py @@ -392,6 +392,31 @@ class TDTestCase: tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") assert unionallQnode==tdSql.queryResult + queryPolicy=1 + + tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) + tdSql.query("show local variables;") + for i in range(tdSql.queryRows): + if tdSql.queryResult[i][0] == "queryPolicy" : + if int(tdSql.queryResult[i][1]) == int(queryPolicy): + tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + else : + tdLog.debug(tdSql.queryResult) + tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) + tdSql.execute("reset query cache") + + tdSql.execute("use db1;") + tdSql.query("show dnodes;") + dnodeId=tdSql.getData(0,0) + tdSql.query("select max(c1) from stb10;") + assert maxQnode==tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + assert minQnode==tdSql.getData(0,0) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + assert unionQnode==tdSql.queryResult + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + assert unionallQnode==tdSql.queryResult + # test case : queryPolicy = 2 def test_case2(self): self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10) diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py index 1e017bb39b..664bc8dc5b 100644 --- a/tests/system-test/2-query/tsbsQuery.py +++ b/tests/system-test/2-query/tsbsQuery.py @@ -40,9 +40,9 @@ class TDTestCase: f"insert into rct{j} values ( {ts+i*10000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" ) tdSql.execute( - f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {1+i},{1500+i*20}, {150+i*2},{5+i} )" + f"insert into dct{j} values ( {ts+i*10000}, {1+i*0.1},{1400+i*15}, {i},{1500+i*20}, {150+i*2},{5+i} )" ) - + tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;") # def check_avg(self ,origin_query , check_query): # avg_result = tdSql.getResult(origin_query) # origin_result = tdSql.getResult(check_query) @@ -60,13 +60,15 @@ class TDTestCase: def tsbsIotQuery(self): + tdSql.execute("use db_tsbs") # test interval and partition tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ") + print(tdSql.queryResult) parRows=tdSql.queryRows tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ") - # tdSql.checkRows(parRows) + tdSql.checkRows(parRows) # test insert into @@ -77,18 +79,53 @@ class TDTestCase: # test paitition interval fill - # tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;") + tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;") - # # test partition interval limit - # tdSql.query("SELECT ts,model,floor(2*(sum(nzs)/count(nzs)))/floor(2*(sum(nzs)/count(nzs))) AS broken_down FROM (SELECT ts,model, status/status AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model,ts interval(10m) limit 10;") + # test partition interval limit (PRcore-TD-17410) + # tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);") # tdSql.checkRows(10) # test partition interval Pseudo time-column tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + # 1 high-load: + # tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;") + + # tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;") + + # 2 stationary-trucks + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)") + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name") + + # 3 long-driving-sessions + # tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;") + + + #4 long-daily-sessions + tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60") + + # 5. avg-daily-driving-duration + tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;") + + + # 6. avg-daily-driving-session + #taosc core dumped + tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))") + tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;") + tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)") + + # 7. avg-load + tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;") + + # 8. daily-activity + tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + + #it's already supported: + # last-loc + tdSql.query("") - # test def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdLog.printNoPrefix("==========step1:create database and table,insert data ==============") self.prepareData() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 8bafe3c966..7a72dc75b2 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -29,6 +29,7 @@ python3 ./test.py -f 1-insert/block_wise.py python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/table_param_ttl.py +python3 ./test.py -f 2-query/db.py python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py diff --git a/tools/taos-tools b/tools/taos-tools index d807c3ffa6..bd496f76b6 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit d807c3ffa6f750f7765e102917d1328cadf21c13 +Subproject commit bd496f76b64931c66da2f8b0f24143a98a881cde diff --git a/tools/taosws-rs b/tools/taosws-rs index c5fded266d..7a94ffab45 160000 --- a/tools/taosws-rs +++ b/tools/taosws-rs @@ -1 +1 @@ -Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343 +Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6