Merge pull request #15569 from taosdata/feature/TD-11274-3.0
feat: rsma1/rsma2 support replication
This commit is contained in:
commit
0bf681662d
|
@ -1713,8 +1713,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type,
|
printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag,
|
||||||
pDataBlock->info.childId, pDataBlock->info.groupId);
|
pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
|
||||||
|
pDataBlock->info.groupId);
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
printf("%s |", flag);
|
printf("%s |", flag);
|
||||||
for (int32_t k = 0; k < numOfCols; k++) {
|
for (int32_t k = 0; k < numOfCols; k++) {
|
||||||
|
|
|
@ -31,6 +31,7 @@ target_sources(
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
"src/sma/smaCommit.c"
|
"src/sma/smaCommit.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
|
"src/sma/smaSnapshot.c"
|
||||||
"src/sma/smaTimeRange.c"
|
"src/sma/smaTimeRange.c"
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
|
|
|
@ -209,6 +209,9 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen,
|
||||||
|
|
||||||
// smaFileUtil ================
|
// smaFileUtil ================
|
||||||
|
|
||||||
|
typedef struct SQTaskFReader SQTaskFReader;
|
||||||
|
typedef struct SQTaskFWriter SQTaskFWriter;
|
||||||
|
|
||||||
#define TD_FILE_HEAD_SIZE 512
|
#define TD_FILE_HEAD_SIZE 512
|
||||||
|
|
||||||
typedef struct STFInfo STFInfo;
|
typedef struct STFInfo STFInfo;
|
||||||
|
|
|
@ -62,6 +62,8 @@ typedef struct SMetaSnapReader SMetaSnapReader;
|
||||||
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
||||||
typedef struct STsdbSnapReader STsdbSnapReader;
|
typedef struct STsdbSnapReader STsdbSnapReader;
|
||||||
typedef struct STsdbSnapWriter STsdbSnapWriter;
|
typedef struct STsdbSnapWriter STsdbSnapWriter;
|
||||||
|
typedef struct SRsmaSnapReader SRsmaSnapReader;
|
||||||
|
typedef struct SRsmaSnapWriter SRsmaSnapWriter;
|
||||||
typedef struct SSnapDataHdr SSnapDataHdr;
|
typedef struct SSnapDataHdr SSnapDataHdr;
|
||||||
|
|
||||||
#define VNODE_META_DIR "meta"
|
#define VNODE_META_DIR "meta"
|
||||||
|
@ -196,13 +198,21 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
|
||||||
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
||||||
// STsdbSnapReader ========================================
|
// STsdbSnapReader ========================================
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapWriter ========================================
|
// STsdbSnapWriter ========================================
|
||||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
|
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
|
||||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||||
|
// SRsmaSnapReader ========================================
|
||||||
|
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader);
|
||||||
|
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader);
|
||||||
|
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData);
|
||||||
|
// SRsmaSnapWriter ========================================
|
||||||
|
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter);
|
||||||
|
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
|
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t streamType; // sma or other
|
int8_t streamType; // sma or other
|
||||||
|
@ -314,6 +324,15 @@ struct SSma {
|
||||||
// sma
|
// sma
|
||||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||||
|
|
||||||
|
enum {
|
||||||
|
SNAP_DATA_META = 0,
|
||||||
|
SNAP_DATA_TSDB = 1,
|
||||||
|
SNAP_DATA_DEL = 2,
|
||||||
|
SNAP_DATA_RSMA1 = 3,
|
||||||
|
SNAP_DATA_RSMA2 = 4,
|
||||||
|
SNAP_DATA_QTASK = 5,
|
||||||
|
};
|
||||||
|
|
||||||
struct SSnapDataHdr {
|
struct SSnapDataHdr {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int64_t index;
|
int64_t index;
|
||||||
|
|
|
@ -109,7 +109,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
pHdr->type = 0; // TODO: use macro
|
pHdr->type = SNAP_DATA_META;
|
||||||
pHdr->size = nData;
|
pHdr->size = nData;
|
||||||
memcpy(pHdr->data, pData, nData);
|
memcpy(pHdr->data, pData, nData);
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,8 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pSma->pRSmaTsdb[i]) {
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]);
|
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
|
||||||
|
&pReader->pDataReader[i]);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -221,10 +222,9 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
|
|
||||||
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -245,15 +245,17 @@ int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
|
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
|
||||||
} else if (pHdr->type == SNAP_DATA_QTASK) {
|
} else if (pHdr->type == SNAP_DATA_QTASK) {
|
||||||
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
|
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
if (code < 0) goto _err;
|
if (code < 0) goto _err;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
|
smaInfo("vgId:%d rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
|
smaError("vgId:%d rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ struct STsdbSnapReader {
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
STsdbFS fs;
|
STsdbFS fs;
|
||||||
|
int8_t type;
|
||||||
// for data file
|
// for data file
|
||||||
int8_t dataDone;
|
int8_t dataDone;
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
@ -62,7 +63,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
pReader->iBlockIdx = 0;
|
pReader->iBlockIdx = 0;
|
||||||
pReader->pBlockIdx = NULL;
|
pReader->pBlockIdx = NULL;
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid);
|
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
pReader->fid);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -130,7 +132,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
pHdr->type = 1;
|
pHdr->type = pReader->type;
|
||||||
pHdr->size = size;
|
pHdr->size = size;
|
||||||
|
|
||||||
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
||||||
|
@ -139,9 +141,9 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64
|
tsdbInfo("vgId:%d vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
|
||||||
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
|
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
|
||||||
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
||||||
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
|
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
|
||||||
size);
|
size);
|
||||||
|
|
||||||
|
@ -154,7 +156,8 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +215,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
pHdr->type = 2;
|
pHdr->type = SNAP_DATA_DEL;
|
||||||
pHdr->size = size;
|
pHdr->size = size;
|
||||||
|
|
||||||
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
||||||
|
@ -228,8 +231,8 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
n += tPutDelData((*ppData) + n, pDelData);
|
n += tPutDelData((*ppData) + n, pDelData);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
|
tsdbInfo("vgId:%d vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
|
||||||
TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size);
|
TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -238,11 +241,12 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbSnapReader* pReader = NULL;
|
STsdbSnapReader* pReader = NULL;
|
||||||
|
|
||||||
|
@ -255,6 +259,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
|
||||||
pReader->pTsdb = pTsdb;
|
pReader->pTsdb = pTsdb;
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
pReader->type = type;
|
||||||
|
|
||||||
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
|
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -297,12 +302,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode));
|
tsdbInfo("vgId:%d vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -327,7 +333,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
||||||
|
|
||||||
tsdbFSUnref(pReader->pTsdb, &pReader->fs);
|
tsdbFSUnref(pReader->pTsdb, &pReader->fs);
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode));
|
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
|
||||||
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
|
@ -368,10 +374,12 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tsdbDebug("vgId:%d vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode),
|
||||||
|
pReader->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -436,7 +444,8 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData,
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -522,9 +531,12 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tsdbInfo("vgId:%d tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -570,6 +582,8 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -708,8 +722,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
tsdbError("vgId:%d vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
tstrerror(code));
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -794,11 +808,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tsdbDebug("vgId:%d vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
tsdbError("vgId:%d vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
tstrerror(code));
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -833,11 +848,12 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
|
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -920,12 +936,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
code = tsdbSnapWriteTableData(pWriter, id);
|
code = tsdbSnapWriteTableData(pWriter, id);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
|
tsdbInfo("vgId:%d vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
|
||||||
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow);
|
TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1015,7 +1032,8 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1056,11 +1074,12 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode));
|
tsdbInfo("vgId:%d vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1127,10 +1146,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||||
|
return code;
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1157,14 +1178,16 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
tsdbError("vgId:%d vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||||
tstrerror(code));
|
pWriter->pTsdb->path, tstrerror(code));
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
*ppWriter = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1173,7 +1196,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
|
|
||||||
// ts data
|
// ts data
|
||||||
if (pHdr->type == 1) {
|
if (pHdr->type == SNAP_DATA_TSDB) {
|
||||||
code = tsdbSnapWriteData(pWriter, pData, nData);
|
code = tsdbSnapWriteData(pWriter, pData, nData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
@ -1186,15 +1209,17 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// del data
|
// del data
|
||||||
if (pHdr->type == 2) {
|
if (pHdr->type == SNAP_DATA_DEL) {
|
||||||
code = tsdbSnapWriteDel(pWriter, pData, nData);
|
code = tsdbSnapWriteDel(pWriter, pData, nData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tsdbDebug("vgId:%d tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,8 @@ struct SVSnapReader {
|
||||||
int8_t tsdbDone;
|
int8_t tsdbDone;
|
||||||
STsdbSnapReader *pTsdbReader;
|
STsdbSnapReader *pTsdbReader;
|
||||||
// rsma
|
// rsma
|
||||||
int8_t rsmaDone[TSDB_RETENTION_L2];
|
int8_t rsmaDone;
|
||||||
|
SRsmaSnapReader *pRsmaReader;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
|
||||||
|
@ -57,6 +58,10 @@ _err:
|
||||||
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
|
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (pReader->pRsmaReader) {
|
||||||
|
rsmaSnapReaderClose(&pReader->pRsmaReader);
|
||||||
|
}
|
||||||
|
|
||||||
if (pReader->pTsdbReader) {
|
if (pReader->pTsdbReader) {
|
||||||
tsdbSnapReaderClose(&pReader->pTsdbReader);
|
tsdbSnapReaderClose(&pReader->pTsdbReader);
|
||||||
}
|
}
|
||||||
|
@ -99,7 +104,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (!pReader->tsdbDone) {
|
if (!pReader->tsdbDone) {
|
||||||
// open if not
|
// open if not
|
||||||
if (pReader->pTsdbReader == NULL) {
|
if (pReader->pTsdbReader == NULL) {
|
||||||
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader);
|
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,40 +123,26 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RSMA ==============
|
// RSMA ==============
|
||||||
#if 0
|
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
|
||||||
if (VND_IS_RSMA(pReader->pVnode)) {
|
// open if not
|
||||||
// RSMA1/RSMA2
|
if (pReader->pRsmaReader == NULL) {
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
|
||||||
if (!pReader->rsmaDone[i]) {
|
if (code) goto _err;
|
||||||
if (!pReader->pVnode->pSma->pRSmaTsdb[i]) {
|
}
|
||||||
// no valid tsdb
|
|
||||||
pReader->rsmaDone[i] = 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (pReader->pTsdbReader == NULL) {
|
|
||||||
code = tsdbSnapReaderOpen(pReader->pVnode->pSma->pRSmaTsdb[i], pReader->sver, pReader->ever,
|
|
||||||
&pReader->pTsdbReader);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbSnapRead(pReader->pTsdbReader, ppData);
|
code = rsmaSnapRead(pReader->pRsmaReader, ppData);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
if (*ppData) {
|
if (*ppData) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->tsdbDone = 1;
|
pReader->tsdbDone = 1;
|
||||||
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
|
code = rsmaSnapReaderClose(&pReader->pRsmaReader);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// QTaskInfoFile
|
|
||||||
// TODO ...
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
*ppData = NULL;
|
*ppData = NULL;
|
||||||
*nData = 0;
|
*nData = 0;
|
||||||
|
@ -186,6 +177,8 @@ struct SVSnapWriter {
|
||||||
SMetaSnapWriter *pMetaSnapWriter;
|
SMetaSnapWriter *pMetaSnapWriter;
|
||||||
// tsdb
|
// tsdb
|
||||||
STsdbSnapWriter *pTsdbSnapWriter;
|
STsdbSnapWriter *pTsdbSnapWriter;
|
||||||
|
// rsma
|
||||||
|
SRsmaSnapWriter *pRsmaSnapWriter;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
|
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
|
||||||
|
@ -235,6 +228,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWriter->pRsmaSnapWriter) {
|
||||||
|
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
if (!rollback) {
|
if (!rollback) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
@ -282,28 +280,51 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
|
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
|
||||||
pHdr->type, nData);
|
pHdr->type, nData);
|
||||||
|
|
||||||
if (pHdr->type == 0) {
|
switch (pHdr->type) {
|
||||||
// meta
|
case SNAP_DATA_META: {
|
||||||
|
// meta
|
||||||
|
if (pWriter->pMetaSnapWriter == NULL) {
|
||||||
|
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
if (pWriter->pMetaSnapWriter == NULL) {
|
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
|
||||||
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
} break;
|
||||||
|
case SNAP_DATA_TSDB: {
|
||||||
|
// tsdb
|
||||||
|
if (pWriter->pTsdbSnapWriter == NULL) {
|
||||||
|
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
|
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
|
||||||
if (code) goto _err;
|
|
||||||
} else {
|
|
||||||
// tsdb
|
|
||||||
|
|
||||||
if (pWriter->pTsdbSnapWriter == NULL) {
|
|
||||||
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
} break;
|
||||||
|
case SNAP_DATA_RSMA1:
|
||||||
|
case SNAP_DATA_RSMA2: {
|
||||||
|
// rsma1/rsma2
|
||||||
|
if (pWriter->pRsmaSnapWriter == NULL) {
|
||||||
|
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
|
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
} break;
|
||||||
|
case SNAP_DATA_QTASK: {
|
||||||
|
// qtask for rsma
|
||||||
|
if (pWriter->pRsmaSnapWriter == NULL) {
|
||||||
|
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
|
||||||
|
if (code) goto _err;
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
|
|
@ -283,12 +283,14 @@ print ================== server restart completed
|
||||||
sql connect
|
sql connect
|
||||||
sql use first_db0;
|
sql use first_db0;
|
||||||
|
|
||||||
sql select last(*), tbname from m1 group by tbname;
|
sql select last(*), tbname from m1 group by tbname order by tbname;
|
||||||
|
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data00 != @20-03-01 01:01:01.000@ then
|
if $data00 != @20-03-01 01:01:01.000@ then
|
||||||
|
print data00 $data00 != 20-03-01 01:01:01.000@
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ endi
|
||||||
|
|
||||||
$replica = 3
|
$replica = 3
|
||||||
$vgroups = 1
|
$vgroups = 1
|
||||||
$retentions = 5s:7d,15s:21d
|
$retentions = 5s:7d,15s:21d,1m:365d
|
||||||
|
|
||||||
print ============= create database
|
print ============= create database
|
||||||
sql create database db replica $replica vgroups $vgroups retentions $retentions
|
sql create database db replica $replica vgroups $vgroups retentions $retentions
|
||||||
|
@ -114,7 +114,7 @@ endi
|
||||||
|
|
||||||
vg_ready:
|
vg_ready:
|
||||||
print ====> create stable/child table
|
print ====> create stable/child table
|
||||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum)
|
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
|
||||||
|
|
||||||
sql show stables
|
sql show stables
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
|
@ -129,20 +129,28 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||||
sleep 3000
|
sleep 3000
|
||||||
|
|
||||||
|
|
||||||
print ===> write 100 records
|
print ===> write 0-50 records
|
||||||
$N = 100
|
$ms = 0
|
||||||
$count = 0
|
$cnt = 0
|
||||||
while $count < $N
|
while $cnt < 50
|
||||||
$ms = 1659000000000 + $count
|
$ms = $cnt . m
|
||||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
|
||||||
$count = $count + 1
|
$cnt = $cnt + 1
|
||||||
endw
|
endw
|
||||||
|
print ===> flush database db
|
||||||
|
sql flush database db;
|
||||||
|
sleep 5000
|
||||||
|
|
||||||
|
print ===> write 51-100 records
|
||||||
|
while $cnt < 100
|
||||||
|
$ms = $cnt . m
|
||||||
|
sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
|
||||||
|
$cnt = $cnt + 1
|
||||||
|
endw
|
||||||
|
|
||||||
#sql flush database db;
|
print ===> flush database db
|
||||||
|
sql flush database db;
|
||||||
|
sleep 5000
|
||||||
sleep 3000
|
|
||||||
|
|
||||||
|
|
||||||
print ===> stop dnode1 dnode2 dnode3
|
print ===> stop dnode1 dnode2 dnode3
|
||||||
|
@ -150,8 +158,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||||
|
|
||||||
sleep 10000
|
|
||||||
|
|
||||||
########################################################
|
########################################################
|
||||||
print ===> start dnode1 dnode2 dnode3 dnode4
|
print ===> start dnode1 dnode2 dnode3 dnode4
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
@ -164,7 +170,7 @@ sleep 3000
|
||||||
print =============== query data
|
print =============== query data
|
||||||
sql connect
|
sql connect
|
||||||
sql use db
|
sql use db
|
||||||
sql select * from ct1
|
sql select * from ct1 where ts > now - 1d
|
||||||
print rows: $rows
|
print rows: $rows
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
if $rows != 100 then
|
if $rows != 100 then
|
||||||
|
|
|
@ -213,7 +213,7 @@ class TDTestCase:
|
||||||
tdSql.error("select irate(c1), abs(c1) from ct4 ")
|
tdSql.error("select irate(c1), abs(c1) from ct4 ")
|
||||||
|
|
||||||
# agg functions mix with agg functions
|
# agg functions mix with agg functions
|
||||||
tdSql.query("select irate(c1), count(c5) from stb1 partition by tbname ")
|
tdSql.query("select irate(c1), count(c5) from stb1 partition by tbname order by tbname")
|
||||||
tdSql.checkData(0, 0, 0.000000000)
|
tdSql.checkData(0, 0, 0.000000000)
|
||||||
tdSql.checkData(1, 0, 0.000000000)
|
tdSql.checkData(1, 0, 0.000000000)
|
||||||
tdSql.checkData(0, 1, 13)
|
tdSql.checkData(0, 1, 13)
|
||||||
|
|
Loading…
Reference in New Issue