Merge pull request #23017 from taosdata/case/TD-26408-MAIN

fix: add case for lost data after  split vgroup (main)
This commit is contained in:
Alex Duan 2023-10-10 00:54:34 -05:00 committed by GitHub
commit 9da96ebf63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 548 additions and 85 deletions

View File

@ -105,6 +105,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
pHdr->size = len;
memcpy(pHdr->data, rowData, len);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
taosMemoryFree(rowData);
return code;
_err:

View File

@ -459,7 +459,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
} else {
stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
taosMkDir(state);
}
taosMemoryFree(chkp);
@ -813,6 +813,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
}
int32_t nCf = taosArrayGetSize(pHandle);
if (nCf == 0) {
taosArrayDestroy(pHandle);
return nCf;
}
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
@ -845,6 +849,9 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
if (nCf == 0) {
return 0;
}
int code = 0;
char* err = NULL;
@ -910,7 +917,7 @@ int32_t streamBackendTriggerChkp(void* arg, char* dst) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
taosGetTimestampMs() - st);
taosGetTimestampMs() - st);
}
} else {
stError("stream backend:%p failed to flush db at:%s", pHandle, dst);
@ -985,9 +992,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
taosGetTimestampMs() - st);
taosGetTimestampMs() - st);
}
} else {
} else {
stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
@ -1711,7 +1718,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* status[] = {"close", "drop"};
stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
wrapper->idstr);
wrapper->idstr);
wrapper->remove |= remove; // update by other pState
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
}
@ -1783,35 +1790,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
} \
taosMemoryFree(ttlV); \
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, \
ttlVLen); \
} \
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
@ -1821,7 +1829,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
@ -1836,9 +1844,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
@ -1846,11 +1854,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
@ -1864,7 +1872,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
@ -1877,11 +1885,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \
} while (0);

View File

@ -149,7 +149,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
@ -208,7 +207,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
stage);
return pMeta;
_err:
@ -248,7 +247,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
tstrerror(terrno));
tstrerror(terrno));
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
@ -268,6 +267,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
@ -379,10 +380,10 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId,
tstrerror(terrno));
stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
tstrerror(terrno));
} else {
stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId);
stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
}
return code;
@ -393,7 +394,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
*pAdded = false;
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
@ -434,7 +435,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int32_t num = 0;
size_t size = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < size; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (p == NULL) {
continue;
@ -451,7 +452,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
STaskId id = {.streamId = streamId, .taskId = taskId};
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
@ -495,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// pre-delete operation
taosWLockLatch(&pMeta->lock);
STaskId id = {.streamId = streamId, .taskId = taskId};
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
@ -512,7 +513,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWUnLockLatch(&pMeta->lock);
stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while (1) {
taosRLockLatch(&pMeta->lock);
@ -650,7 +651,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
@ -683,8 +684,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually", vgId, tsDataDir);
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream "
"manually",
vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);
@ -756,7 +759,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
}
@ -803,7 +806,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
@ -915,7 +918,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
@ -952,7 +955,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);

View File

@ -17,8 +17,8 @@
#include "query.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "streamInt.h"
#include "tcommon.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,
@ -126,7 +126,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkp(pMeta, chkpId);
} else {
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
tdir);
}
}
@ -271,7 +272,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) {
// del tmp dir
if (taosIsDir(pFile->path)) {
if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
}
} else {
@ -335,24 +336,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
item->type, tstrerror(code));
item->type, tstrerror(code));
return -1;
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
@ -361,7 +362,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
}
} else {
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
pHandle->currFileIdx);
pHandle->currFileIdx);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
@ -379,7 +380,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->offset += nread;
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@ -434,8 +435,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
}
@ -461,8 +462,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);

View File

@ -195,7 +195,8 @@
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep1.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep3.py -N 3
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3

View File

@ -0,0 +1,439 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import random
import time
import copy
import string
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# random string
def random_string(self, count):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(count))
# get col value and total max min ...
def getColsValue(self, i, j):
# c1 value
if random.randint(1, 10) == 5:
c1 = None
else:
c1 = 1
# c2 value
if j % 3200 == 0:
c2 = 8764231
elif random.randint(1, 10) == 5:
c2 = None
else:
c2 = random.randint(-87654297, 98765321)
value = f"({self.ts}, "
# c1
if c1 is None:
value += "null,"
else:
self.c1Cnt += 1
value += f"{c1},"
# c2
if c2 is None:
value += "null,"
else:
value += f"{c2},"
# total count
self.c2Cnt += 1
# max
if self.c2Max is None:
self.c2Max = c2
else:
if c2 > self.c2Max:
self.c2Max = c2
# min
if self.c2Min is None:
self.c2Min = c2
else:
if c2 < self.c2Min:
self.c2Min = c2
# sum
if self.c2Sum is None:
self.c2Sum = c2
else:
self.c2Sum += c2
# c3 same with ts
value += f"{self.ts})"
# move next
self.ts += 1
return value
# insert data
def insertData(self):
tdLog.info("insert data ....")
sqls = ""
for i in range(self.childCnt):
# insert child table
values = ""
pre_insert = f"insert into @db_name.t{i} values "
for j in range(self.childRow):
if values == "":
values = self.getColsValue(i, j)
else:
values += "," + self.getColsValue(i, j)
# batch insert
if j % self.batchSize == 0 and values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# append last
if values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# insert nomal talbe
for i in range(20):
self.ts += 1000
name = self.random_string(20)
sql = f"insert into @db_name.ta values({self.ts}, {i}, {self.ts%100000}, '{name}', false)"
self.exeDouble(sql)
# insert finished
tdLog.info(f"insert data successfully.\n"
f" inserted child table = {self.childCnt}\n"
f" inserted child rows = {self.childRow}\n"
f" total inserted rows = {self.childCnt*self.childRow}\n")
return
def exeDouble(self, sql):
# dbname replace
sql1 = sql.replace("@db_name", self.db1)
if len(sql1) > 100:
tdLog.info(sql1[:100])
else:
tdLog.info(sql1)
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
if len(sql2) > 100:
tdLog.info(sql2[:100])
else:
tdLog.info(sql2)
tdSql.execute(sql2)
# prepareEnv
def prepareEnv(self):
# init
self.ts = 1680000000000
self.childCnt = 4
self.childRow = 10000
self.batchSize = 50000
self.vgroups1 = 1
self.vgroups2 = 1
self.db1 = "db1"
self.db2 = "db2"
# total
self.c1Cnt = 0
self.c2Cnt = 0
self.c2Max = None
self.c2Min = None
self.c2Sum = None
# create database db
sql = f"create database @db_name vgroups {self.vgroups1} replica 1"
self.exeDouble(sql)
# create super talbe st
sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
self.exeDouble(sql)
# create child table
for i in range(self.childCnt):
sql = f"create table @db_name.t{i} using @db_name.st tags({i}) "
self.exeDouble(sql)
# create normal table
sql = f"create table @db_name.ta(ts timestamp, c1 int, c2 bigint, c3 binary(32), c4 bool)"
self.exeDouble(sql)
# insert data
self.insertData()
# update
self.ts = 1680000000000 + 20000
self.childRow = 1000
# delete data
sql = "delete from @db_name.st where ts > 1680000019000 and ts < 1680000062000"
self.exeDouble(sql)
sql = "delete from @db_name.st where ts > 1680000099000 and ts < 1680000170000"
self.exeDouble(sql)
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
rowCnt = tdSql.getRows()
for i in range(rowCnt):
val = tdSql.getData(i,0)
if val != expectVal:
tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}")
return False
tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}")
return True
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
# check query result same
def queryDouble(self, sql):
# sql
sql1 = sql.replace('@db_name', self.db1)
tdLog.info(sql1)
start1 = time.time()
rows1 = tdSql.query(sql1)
spend1 = time.time() - start1
res1 = copy.copy(tdSql.queryResult)
sql2 = sql.replace('@db_name', self.db2)
tdLog.info(sql2)
start2 = time.time()
tdSql.query(sql2)
spend2 = time.time() - start2
res2 = tdSql.queryResult
rowlen1 = len(res1)
rowlen2 = len(res2)
if rowlen1 != rowlen2:
tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ")
return False
for i in range(rowlen1):
row1 = res1[i]
row2 = res2[i]
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}")
return False
for j in range(collen1):
if row1[j] != row2[j]:
tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .")
return False
# warning performance
diff = (spend2 - spend1)*100/spend1
tdLog.info("spend1=%.6fs spend2=%.6fs diff=%.1f%%"%(spend1, spend2, diff))
if spend2 > spend1 and diff > 20:
tdLog.info("warning: the diff for performance after spliting is over 20%")
return True
# check result
def checkResult(self):
# check vgroupid
sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{self.db2}'"
tdSql.query(sql)
tdSql.checkRows(self.vgroups2)
# check child table count same
sql = "select table_name from information_schema.ins_tables where db_name='@db_name' order by table_name"
self.queryDouble(sql)
# check row value is ok
sql = "select * from @db_name.st order by ts"
self.queryDouble(sql)
# where
sql = "select *,tbname from @db_name.st where c1 < 1000 order by ts"
self.queryDouble(sql)
# max
sql = "select max(c1) from @db_name.st"
self.queryDouble(sql)
# min
sql = "select min(c2) from @db_name.st"
self.queryDouble(sql)
# sum
sql = "select sum(c1) from @db_name.st"
self.queryDouble(sql)
# normal table
# count
sql = "select count(*) from @db_name.ta"
self.queryDouble(sql)
# all rows
sql = "select * from @db_name.ta"
self.queryDouble(sql)
# sum
sql = "select sum(c1) from @db_name.ta"
self.queryDouble(sql)
# get vgroup list
def getVGroup(self, db_name):
vgidList = []
sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{db_name}'"
res = tdSql.getResult(sql)
rows = len(res)
for i in range(rows):
vgidList.append(res[i][0])
return vgidList;
# split vgroup on db2
def splitVGroup(self, db_name):
vgids = self.getVGroup(db_name)
selid = random.choice(vgids)
sql = f"split vgroup {selid}"
tdLog.info(sql)
tdSql.execute(sql)
# wait end
seconds = 300
for i in range(seconds):
sql ="show transactions;"
rows = tdSql.query(sql)
if rows == 0:
tdLog.info("split vgroup finished.")
return True
#tdLog.info(f"i={i} wait split vgroup ...")
time.sleep(1)
tdLog.exit(f"split vgroup transaction is not finished after executing {seconds}s")
return False
# split error
def expectSplitError(self, dbName):
vgids = self.getVGroup(dbName)
selid = random.choice(vgids)
sql = f"split vgroup {selid}"
tdLog.info(sql)
tdSql.error(sql)
# expect split ok
def expectSplitOk(self, dbName):
# split vgroup
vgList1 = self.getVGroup(dbName)
self.splitVGroup(dbName)
vgList2 = self.getVGroup(dbName)
vgNum1 = len(vgList1) + 1
vgNum2 = len(vgList2)
if vgNum1 != vgNum2:
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
return
# split empty database
def splitEmptyDB(self):
dbName = "emptydb"
vgNum = 2
# create database
sql = f"create database {dbName} vgroups {vgNum} replica 1"
tdLog.info(sql)
tdSql.execute(sql)
# split vgroup
self.expectSplitOk(dbName)
# forbid
def checkForbid(self):
# stream
tdLog.info("check forbid split having stream...")
tdSql.execute("create database streamdb;")
tdSql.execute("use streamdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);")
self.expectSplitError("streamdb")
tdSql.execute("drop stream ma;")
self.expectSplitOk("streamdb")
# topic
tdLog.info("check forbid split having topic...")
tdSql.execute("create database topicdb wal_retention_period 10;")
tdSql.execute("use topicdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create topic toa as select * from ta;")
self.expectSplitError("topicdb")
tdSql.execute("drop topic toa;")
self.expectSplitOk("topicdb")
# compact and check db2
def compactAndCheck(self):
tdLog.info("compact db2 and check result ...")
# compact
tdSql.execute(f"compact database {self.db2};")
# check result
self.checkResult()
# run
def run(self):
# prepare env
self.prepareEnv()
for i in range(3):
# split vgroup on db2
start = time.time()
self.splitVGroup(self.db2)
end = time.time()
self.vgroups2 += 1
# check two db query result same
self.checkResult()
spend = "%.3f"%(end-start)
tdLog.info(f"split vgroup i={i} passed. spend = {spend}s")
# split empty db
self.splitEmptyDB()
# check topic and stream forib
self.checkForbid()
# compact database
self.compactAndCheck()
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -137,10 +137,10 @@ class TDTestCase:
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
if len(sql1) > 100:
tdLog.info(sql1[:100])
if len(sql2) > 100:
tdLog.info(sql2[:100])
else:
tdLog.info(sql1)
tdLog.info(sql2)
tdSql.execute(sql2)
@ -151,8 +151,8 @@ class TDTestCase:
self.childCnt = 10
self.childRow = 10000
self.batchSize = 5000
self.vgroups1 = 4
self.vgroups2 = 4
self.vgroups1 = 2
self.vgroups2 = 2
self.db1 = "db1"
self.db2 = "db2"
@ -183,6 +183,16 @@ class TDTestCase:
# insert data
self.insertData()
# update
self.ts = 1680000000000 + 10000
self.childRow = 2000
# delete data
sql = "delete from @db_name.st where ts > 1680000001900 and ts < 1680000012000"
self.exeDouble(sql)
sql = "delete from @db_name.st where ts > 1680000029000 and ts < 1680000048000"
self.exeDouble(sql)
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
@ -225,7 +235,7 @@ class TDTestCase:
rowlen2 = len(res2)
if rowlen1 != rowlen2:
tdLog.exit(f"rowlen1={rowlen1} rowlen2={rowlen2} both not equal.")
tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ")
return False
for i in range(rowlen1):
@ -234,11 +244,11 @@ class TDTestCase:
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
tdLog.exit(f"collen1={collen1} collen2={collen2} both not equal.")
tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}")
return False
for j in range(collen1):
if row1[j] != row2[j]:
tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.")
tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .")
return False
# warning performance
@ -354,7 +364,7 @@ class TDTestCase:
dbName = "emptydb"
vgNum = 2
# create database
sql = f"create database {dbName} vgroups {vgNum}"
sql = f"create database {dbName} vgroups {vgNum} replica 3"
tdLog.info(sql)
tdSql.execute(sql)
@ -397,7 +407,7 @@ class TDTestCase:
# prepare env
self.prepareEnv()
for i in range(3):
for i in range(2):
# split vgroup on db2
start = time.time()
self.splitVGroup(self.db2)