diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b8d5c35dfb..41392ba27b 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -105,6 +105,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { pHdr->size = len; memcpy(pHdr->data, rowData, len); tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); + taosMemoryFree(rowData); return code; _err: diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a419a59996..815f8b0728 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -459,7 +459,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { } else { stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp, - tstrerror(TAOS_SYSTEM_ERROR(errno)), state); + tstrerror(TAOS_SYSTEM_ERROR(errno)), state); taosMkDir(state); } taosMemoryFree(chkp); @@ -813,6 +813,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* } int32_t nCf = taosArrayGetSize(pHandle); + if (nCf == 0) { + taosArrayDestroy(pHandle); + return nCf; + } rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < nCf; i++) { @@ -845,6 +849,9 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { + if (nCf == 0) { + return 0; + } int code = 0; char* err = NULL; @@ -910,7 +917,7 @@ int32_t streamBackendTriggerChkp(void* arg, char* dst) { stError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, - taosGetTimestampMs() - st); + taosGetTimestampMs() - st); } } else { stError("stream backend:%p failed to flush db at:%s", pHandle, dst); @@ -985,9 +992,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { stError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, - taosGetTimestampMs() - st); + taosGetTimestampMs() - st); } - } else { + } else { stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } // release all ref to cfWrapper; @@ -1711,7 +1718,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { char* status[] = {"close", "drop"}; stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, - wrapper->idstr); + wrapper->idstr); wrapper->remove |= remove; // update by other pState taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); } @@ -1783,35 +1790,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - stWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + stWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, \ + ttlVLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -1821,7 +1829,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - stWarn("streamState failed to get cf name: %s", funcname); \ + stWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ break; \ } \ @@ -1836,9 +1844,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ if (val == NULL || len == 0) { \ if (err == NULL) { \ - stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ } else { \ - stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ taosMemoryFreeClear(err); \ } \ code = -1; \ @@ -1846,11 +1854,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* p = NULL; \ int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ if (tlen <= 0) { \ - stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ + stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ code = -1; \ } else { \ - stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ + stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ } \ taosMemoryFree(val); \ if (vLen != NULL) *vLen = tlen; \ @@ -1864,7 +1872,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ break; \ } \ @@ -1877,11 +1885,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ if (err != NULL) { \ - stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ taosMemoryFree(err); \ code = -1; \ } else { \ - stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ } \ } while (0); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 45c2743ecb..020f2de326 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -149,7 +149,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pReadyTaskSet == NULL) { - } pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); @@ -208,7 +207,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, - stage); + stage); return pMeta; _err: @@ -248,7 +247,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath, - tstrerror(terrno)); + tstrerror(terrno)); taosMemoryFree(defaultPath); taosMemoryFree(newPath); @@ -268,6 +267,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta); + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); return 0; } @@ -379,10 +380,10 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId, - tstrerror(terrno)); + stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId, + tstrerror(terrno)); } else { - stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId); + stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId); } return code; @@ -393,7 +394,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -434,7 +435,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { int32_t num = 0; size_t size = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < size; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (p == NULL) { continue; @@ -451,7 +452,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - STaskId id = {.streamId = streamId, .taskId = taskId}; + STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { @@ -495,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); - STaskId id = {.streamId = streamId, .taskId = taskId}; + STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; @@ -512,7 +513,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWUnLockLatch(&pMeta->lock); stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId, - streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + streamGetTaskStatusStr(TASK_STATUS__DROPPING)); while (1) { taosRLockLatch(&pMeta->lock); @@ -650,7 +651,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { - TBC* pCur = NULL; + TBC* pCur = NULL; int32_t vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); @@ -683,8 +684,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); stError( - "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " - "manually", vgId, tsDataDir); + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " + "stream " + "manually", + vgId, tsDataDir); return -1; } tDecoderClear(&decoder); @@ -756,7 +759,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, - pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); + pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); return 0; } @@ -803,7 +806,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { - if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter + if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; return true; } @@ -915,7 +918,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbCount += 1; stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, - pMeta->pHbInfo->hbCount); + pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); @@ -952,7 +955,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, - (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index d988d242c8..f66edb2b34 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -17,8 +17,8 @@ #include "query.h" #include "rocksdb/c.h" #include "streamBackendRocksdb.h" -#include "tcommon.h" #include "streamInt.h" +#include "tcommon.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -126,7 +126,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); streamBackendAddInUseChkp(pMeta, chkpId); } else { - stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); + stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, + tdir); } } @@ -271,7 +272,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->checkpointId == 0) { // del tmp dir - if (taosIsDir(pFile->path)) { + if (pFile && taosIsDir(pFile->path)) { taosRemoveDir(pFile->path); } } else { @@ -335,24 +336,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } else { pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } } stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { taosMemoryFree(buf); code = TAOS_SYSTEM_ERROR(terrno); stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, - item->type, tstrerror(code)); + item->type, tstrerror(code)); return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); @@ -361,7 +362,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } } else { stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER, - pHandle->currFileIdx); + pHandle->currFileIdx); taosCloseFile(&pHandle->fd); pHandle->offset = 0; pHandle->currFileIdx += 1; @@ -379,7 +380,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si pHandle->offset += nread; stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", - STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -434,8 +435,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, + pHdr->name, tstrerror(code)); } } @@ -461,8 +462,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, + pHdr->name, tstrerror(code)); } taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index b08fa491f4..1627b3e5b4 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -195,7 +195,8 @@ ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py -,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5 +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep1.py -N 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep3.py -N 3 ,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3 diff --git a/tests/system-test/0-others/splitVGroupRep1.py b/tests/system-test/0-others/splitVGroupRep1.py new file mode 100644 index 0000000000..4db5201011 --- /dev/null +++ b/tests/system-test/0-others/splitVGroupRep1.py @@ -0,0 +1,439 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import random +import time +import copy +import string + +import taos +from util.log import * +from util.cases import * +from util.sql import * + +class TDTestCase: + + # random string + def random_string(self, count): + letters = string.ascii_letters + return ''.join(random.choice(letters) for i in range(count)) + + # get col value and total max min ... + def getColsValue(self, i, j): + # c1 value + if random.randint(1, 10) == 5: + c1 = None + else: + c1 = 1 + + # c2 value + if j % 3200 == 0: + c2 = 8764231 + elif random.randint(1, 10) == 5: + c2 = None + else: + c2 = random.randint(-87654297, 98765321) + + + value = f"({self.ts}, " + + # c1 + if c1 is None: + value += "null," + else: + self.c1Cnt += 1 + value += f"{c1}," + # c2 + if c2 is None: + value += "null," + else: + value += f"{c2}," + # total count + self.c2Cnt += 1 + # max + if self.c2Max is None: + self.c2Max = c2 + else: + if c2 > self.c2Max: + self.c2Max = c2 + # min + if self.c2Min is None: + self.c2Min = c2 + else: + if c2 < self.c2Min: + self.c2Min = c2 + # sum + if self.c2Sum is None: + self.c2Sum = c2 + else: + self.c2Sum += c2 + + # c3 same with ts + value += f"{self.ts})" + + # move next + self.ts += 1 + + return value + + # insert data + def insertData(self): + tdLog.info("insert data ....") + sqls = "" + for i in range(self.childCnt): + # insert child table + values = "" + pre_insert = f"insert into @db_name.t{i} values " + for j in range(self.childRow): + if values == "": + values = self.getColsValue(i, j) + else: + values += "," + self.getColsValue(i, j) + + # batch insert + if j % self.batchSize == 0 and values != "": + sql = pre_insert + values + self.exeDouble(sql) + values = "" + # append last + if values != "": + sql = pre_insert + values + self.exeDouble(sql) + values = "" + + # insert nomal talbe + for i in range(20): + self.ts += 1000 + name = self.random_string(20) + sql = f"insert into @db_name.ta values({self.ts}, {i}, {self.ts%100000}, '{name}', false)" + self.exeDouble(sql) + + # insert finished + tdLog.info(f"insert data successfully.\n" + f" inserted child table = {self.childCnt}\n" + f" inserted child rows = {self.childRow}\n" + f" total inserted rows = {self.childCnt*self.childRow}\n") + return + + def exeDouble(self, sql): + # dbname replace + sql1 = sql.replace("@db_name", self.db1) + + if len(sql1) > 100: + tdLog.info(sql1[:100]) + else: + tdLog.info(sql1) + tdSql.execute(sql1) + + sql2 = sql.replace("@db_name", self.db2) + if len(sql2) > 100: + tdLog.info(sql2[:100]) + else: + tdLog.info(sql2) + tdSql.execute(sql2) + + + # prepareEnv + def prepareEnv(self): + # init + self.ts = 1680000000000 + self.childCnt = 4 + self.childRow = 10000 + self.batchSize = 50000 + self.vgroups1 = 1 + self.vgroups2 = 1 + self.db1 = "db1" + self.db2 = "db2" + + # total + self.c1Cnt = 0 + self.c2Cnt = 0 + self.c2Max = None + self.c2Min = None + self.c2Sum = None + + # create database db + sql = f"create database @db_name vgroups {self.vgroups1} replica 1" + self.exeDouble(sql) + + # create super talbe st + sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)" + self.exeDouble(sql) + + # create child table + for i in range(self.childCnt): + sql = f"create table @db_name.t{i} using @db_name.st tags({i}) " + self.exeDouble(sql) + + # create normal table + sql = f"create table @db_name.ta(ts timestamp, c1 int, c2 bigint, c3 binary(32), c4 bool)" + self.exeDouble(sql) + + # insert data + self.insertData() + + # update + self.ts = 1680000000000 + 20000 + self.childRow = 1000 + + + # delete data + sql = "delete from @db_name.st where ts > 1680000019000 and ts < 1680000062000" + self.exeDouble(sql) + sql = "delete from @db_name.st where ts > 1680000099000 and ts < 1680000170000" + self.exeDouble(sql) + + # check data correct + def checkExpect(self, sql, expectVal): + tdSql.query(sql) + rowCnt = tdSql.getRows() + for i in range(rowCnt): + val = tdSql.getData(i,0) + if val != expectVal: + tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}") + return False + + tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}") + return True + + # init + def init(self, conn, logSql, replicaVar=1): + seed = time.time() % 10000 + random.seed(seed) + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + # check query result same + def queryDouble(self, sql): + # sql + sql1 = sql.replace('@db_name', self.db1) + tdLog.info(sql1) + start1 = time.time() + rows1 = tdSql.query(sql1) + spend1 = time.time() - start1 + res1 = copy.copy(tdSql.queryResult) + + sql2 = sql.replace('@db_name', self.db2) + tdLog.info(sql2) + start2 = time.time() + tdSql.query(sql2) + spend2 = time.time() - start2 + res2 = tdSql.queryResult + + rowlen1 = len(res1) + rowlen2 = len(res2) + + if rowlen1 != rowlen2: + tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ") + return False + + for i in range(rowlen1): + row1 = res1[i] + row2 = res2[i] + collen1 = len(row1) + collen2 = len(row2) + if collen1 != collen2: + tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}") + return False + for j in range(collen1): + if row1[j] != row2[j]: + tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .") + return False + + # warning performance + diff = (spend2 - spend1)*100/spend1 + tdLog.info("spend1=%.6fs spend2=%.6fs diff=%.1f%%"%(spend1, spend2, diff)) + if spend2 > spend1 and diff > 20: + tdLog.info("warning: the diff for performance after spliting is over 20%") + + return True + + + # check result + def checkResult(self): + # check vgroupid + sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{self.db2}'" + tdSql.query(sql) + tdSql.checkRows(self.vgroups2) + + # check child table count same + sql = "select table_name from information_schema.ins_tables where db_name='@db_name' order by table_name" + self.queryDouble(sql) + + # check row value is ok + sql = "select * from @db_name.st order by ts" + self.queryDouble(sql) + + # where + sql = "select *,tbname from @db_name.st where c1 < 1000 order by ts" + self.queryDouble(sql) + + # max + sql = "select max(c1) from @db_name.st" + self.queryDouble(sql) + + # min + sql = "select min(c2) from @db_name.st" + self.queryDouble(sql) + + # sum + sql = "select sum(c1) from @db_name.st" + self.queryDouble(sql) + + # normal table + + # count + sql = "select count(*) from @db_name.ta" + self.queryDouble(sql) + + # all rows + sql = "select * from @db_name.ta" + self.queryDouble(sql) + + # sum + sql = "select sum(c1) from @db_name.ta" + self.queryDouble(sql) + + + # get vgroup list + def getVGroup(self, db_name): + vgidList = [] + sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{db_name}'" + res = tdSql.getResult(sql) + rows = len(res) + for i in range(rows): + vgidList.append(res[i][0]) + + return vgidList; + + # split vgroup on db2 + def splitVGroup(self, db_name): + vgids = self.getVGroup(db_name) + selid = random.choice(vgids) + sql = f"split vgroup {selid}" + tdLog.info(sql) + tdSql.execute(sql) + + # wait end + seconds = 300 + for i in range(seconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("split vgroup finished.") + return True + #tdLog.info(f"i={i} wait split vgroup ...") + time.sleep(1) + + tdLog.exit(f"split vgroup transaction is not finished after executing {seconds}s") + return False + + # split error + def expectSplitError(self, dbName): + vgids = self.getVGroup(dbName) + selid = random.choice(vgids) + sql = f"split vgroup {selid}" + tdLog.info(sql) + tdSql.error(sql) + + # expect split ok + def expectSplitOk(self, dbName): + # split vgroup + vgList1 = self.getVGroup(dbName) + self.splitVGroup(dbName) + vgList2 = self.getVGroup(dbName) + vgNum1 = len(vgList1) + 1 + vgNum2 = len(vgList2) + if vgNum1 != vgNum2: + tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") + return + + # split empty database + def splitEmptyDB(self): + dbName = "emptydb" + vgNum = 2 + # create database + sql = f"create database {dbName} vgroups {vgNum} replica 1" + tdLog.info(sql) + tdSql.execute(sql) + + # split vgroup + self.expectSplitOk(dbName) + + + # forbid + def checkForbid(self): + # stream + tdLog.info("check forbid split having stream...") + tdSql.execute("create database streamdb;") + tdSql.execute("use streamdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);") + self.expectSplitError("streamdb") + tdSql.execute("drop stream ma;") + self.expectSplitOk("streamdb") + + # topic + tdLog.info("check forbid split having topic...") + tdSql.execute("create database topicdb wal_retention_period 10;") + tdSql.execute("use topicdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create topic toa as select * from ta;") + self.expectSplitError("topicdb") + tdSql.execute("drop topic toa;") + self.expectSplitOk("topicdb") + + # compact and check db2 + def compactAndCheck(self): + tdLog.info("compact db2 and check result ...") + # compact + tdSql.execute(f"compact database {self.db2};") + # check result + self.checkResult() + + # run + def run(self): + # prepare env + self.prepareEnv() + + for i in range(3): + # split vgroup on db2 + start = time.time() + self.splitVGroup(self.db2) + end = time.time() + self.vgroups2 += 1 + + # check two db query result same + self.checkResult() + spend = "%.3f"%(end-start) + tdLog.info(f"split vgroup i={i} passed. spend = {spend}s") + + # split empty db + self.splitEmptyDB() + + # check topic and stream forib + self.checkForbid() + + # compact database + self.compactAndCheck() + + # stop + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroupRep3.py similarity index 93% rename from tests/system-test/0-others/splitVGroup.py rename to tests/system-test/0-others/splitVGroupRep3.py index e9ec34f829..d32a9747b5 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroupRep3.py @@ -137,10 +137,10 @@ class TDTestCase: tdSql.execute(sql1) sql2 = sql.replace("@db_name", self.db2) - if len(sql1) > 100: - tdLog.info(sql1[:100]) + if len(sql2) > 100: + tdLog.info(sql2[:100]) else: - tdLog.info(sql1) + tdLog.info(sql2) tdSql.execute(sql2) @@ -151,8 +151,8 @@ class TDTestCase: self.childCnt = 10 self.childRow = 10000 self.batchSize = 5000 - self.vgroups1 = 4 - self.vgroups2 = 4 + self.vgroups1 = 2 + self.vgroups2 = 2 self.db1 = "db1" self.db2 = "db2" @@ -183,6 +183,16 @@ class TDTestCase: # insert data self.insertData() + # update + self.ts = 1680000000000 + 10000 + self.childRow = 2000 + + # delete data + sql = "delete from @db_name.st where ts > 1680000001900 and ts < 1680000012000" + self.exeDouble(sql) + sql = "delete from @db_name.st where ts > 1680000029000 and ts < 1680000048000" + self.exeDouble(sql) + # check data correct def checkExpect(self, sql, expectVal): tdSql.query(sql) @@ -225,7 +235,7 @@ class TDTestCase: rowlen2 = len(res2) if rowlen1 != rowlen2: - tdLog.exit(f"rowlen1={rowlen1} rowlen2={rowlen2} both not equal.") + tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ") return False for i in range(rowlen1): @@ -234,11 +244,11 @@ class TDTestCase: collen1 = len(row1) collen2 = len(row2) if collen1 != collen2: - tdLog.exit(f"collen1={collen1} collen2={collen2} both not equal.") + tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}") return False for j in range(collen1): if row1[j] != row2[j]: - tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.") + tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .") return False # warning performance @@ -354,7 +364,7 @@ class TDTestCase: dbName = "emptydb" vgNum = 2 # create database - sql = f"create database {dbName} vgroups {vgNum}" + sql = f"create database {dbName} vgroups {vgNum} replica 3" tdLog.info(sql) tdSql.execute(sql) @@ -397,7 +407,7 @@ class TDTestCase: # prepare env self.prepareEnv() - for i in range(3): + for i in range(2): # split vgroup on db2 start = time.time() self.splitVGroup(self.db2)