diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 82823e3f57..15cb6d59aa 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -44,6 +44,11 @@ int32_t dmRun(); */ void dmStop(); +/** + * for tests + */ +bool dmReadyForTest(); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 8e760c28be..5b1f31e6c6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -415,3 +415,7 @@ void dmReportStartup(const char *pName, const char *pDesc) { } int64_t dmGetClusterId() { return globalDnode.data.clusterId; } + +bool dmReadyForTest() { + return dmInstance()->data.dnodeVer > 0; +} diff --git a/source/dnode/mgmt/test/sut/inc/server.h b/source/dnode/mgmt/test/sut/inc/server.h index 7343276210..78055393b5 100644 --- a/source/dnode/mgmt/test/sut/inc/server.h +++ b/source/dnode/mgmt/test/sut/inc/server.h @@ -20,10 +20,10 @@ class TestServer { public: bool Start(); void Stop(); -bool runnning; + bool running; private: TdThread threadId; }; -#endif /* _TD_TEST_SERVER_H_ */ \ No newline at end of file +#endif /* _TD_TEST_SERVER_H_ */ diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp index 541c5a42f4..2218504df4 100644 --- a/source/dnode/mgmt/test/sut/src/server.cpp +++ b/source/dnode/mgmt/test/sut/src/server.cpp @@ -17,13 +17,11 @@ void* serverLoop(void* param) { TestServer* server = (TestServer*)param; - server->runnning = false; if (dmInit() != 0) { return NULL; } - server->runnning = true; if (dmRun() != 0) { return NULL; } @@ -33,13 +31,18 @@ void* serverLoop(void* param) { } bool TestServer::Start() { + tstrncpy(tsVersionName, "trial", strlen("trial")); + running = false; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadCreate(&threadId, &thAttr, serverLoop, this); taosThreadAttrDestroy(&thAttr); - taosMsleep(10000); - return runnning; + while (!dmReadyForTest()) { + taosMsleep(500); + } + running = true; + return running; } void TestServer::Stop() { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 44fe81ac09..48549fce42 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1639,11 +1639,11 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; - sprintf(buf, sinkStr, pe->sinkDataSize); + snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); diff --git a/source/dnode/mnode/impl/test/func/func.cpp b/source/dnode/mnode/impl/test/func/func.cpp index 64bca96702..ee60556639 100644 --- a/source/dnode/mnode/impl/test/func/func.cpp +++ b/source/dnode/mnode/impl/test/func/func.cpp @@ -49,6 +49,7 @@ TEST_F(MndTestFunc, 01_Show_Func) { } TEST_F(MndTestFunc, 02_Create_Func) { +#ifndef WINDOWS { SCreateFuncReq createReq = {0}; strcpy(createReq.name, ""); @@ -159,9 +160,11 @@ TEST_F(MndTestFunc, 02_Create_Func) { test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", ""); EXPECT_EQ(test.GetShowRows(), 1); +#endif } TEST_F(MndTestFunc, 03_Retrieve_Func) { +#ifndef WINDOWS { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; @@ -376,9 +379,11 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST); } +#endif } TEST_F(MndTestFunc, 04_Drop_Func) { +#ifndef WINDOWS { SDropFuncReq dropReq = {0}; strcpy(dropReq.name, ""); @@ -441,9 +446,11 @@ TEST_F(MndTestFunc, 04_Drop_Func) { test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", ""); EXPECT_EQ(test.GetShowRows(), 1); +#endif } TEST_F(MndTestFunc, 05_Actual_code) { +#ifndef WINDOWS { SCreateFuncReq createReq = {0}; strcpy(createReq.name, "udf1"); @@ -507,4 +514,5 @@ TEST_F(MndTestFunc, 05_Actual_code) { } tFreeSRetrieveFuncRsp(&retrieveRsp); } -} \ No newline at end of file +#endif +} diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2e65c9ef16..b157597e60 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1155,10 +1155,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { taskDbAddRef(pTaskDb); int64_t chkpId = pTaskDb->chkpId; + taskDbRefChkp(pTaskDb, chkpId); code = taskDbDoCheckpoint(pTaskDb, chkpId); - taskDbRemoveRef(pTaskDb); + if (code != 0) { + taskDbUnRefChkp(pTaskDb, chkpId); + } - taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, @@ -1182,14 +1185,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i); sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId); - STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); - if (pTaskDb == NULL) { + STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); + if (pTaskDb == NULL || *pTaskDb == NULL) { stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId); + memset(buf, 0, sizeof(buf)); continue; } memset(buf, 0, sizeof(buf)); - taskDbUnRefChkp(pTaskDb, pSnap->chkpId); + taskDbUnRefChkp(*pTaskDb, pSnap->chkpId); } taosThreadMutexUnlock(&pMeta->backendMutex); return 0; @@ -1989,7 +1993,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); - for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) { + int32_t size = taosArrayGetSize(pTaskDb->chkpInUse); + for (int i = 0; i < size; i++) { int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); if (*p == chkp) { taosArrayRemove(pTaskDb->chkpInUse, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 047b169ec9..934ff898a9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -184,16 +184,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { +static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; if (taosArrayGetSize(pRes) > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); code = doOutputResultBlockImpl(pTask, pStreamBlocks); - if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { // should not have error code + stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); } } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; } @@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } + // output queue is full, idle for 5 sec. + if (streamQueueIsFull(pTask->outputq.queue)) { + stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE); + } + + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); + } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -284,19 +295,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - /*int32_t code = */handleResultBlocks(pTask, pRes, size); - - int64_t el = taosGetTimestampMs() - st; - - // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); - } + /*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size); if (finished) { return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } + int64_t el = taosGetTimestampMs() - st; if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); @@ -558,7 +563,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); - streamTaskSetIdleInfo(pTask, 500); + streamTaskSetIdleInfo(pTask, 1000); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0aace1cb5b..59d49a8231 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -562,13 +562,16 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncoderClear(&encoder); int64_t id[2] = {pTask->id.streamId, pTask->id.taskId}; - if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { - stError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); - return -1; + + code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s task meta save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); + } else { + stDebug("s-task:%s task meta save to disk", pTask->id.idStr); } taosMemoryFree(buf); - return 0; + return code; } int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5596eb3dee..247baea16f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc } } - return TSDB_CODE_SUCCESS; + return code; } int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index eb67e61c4c..25015c4d33 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } -int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } +int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { } taosArrayDestroy(handle->pDbSnapSet); } - streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); + streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); if (handle->pSnapInfoSet) { for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i); diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 188abb4b58..a16a03d30a 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -140,8 +140,8 @@ ELSE () BUILD_COMMAND COMMAND set CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client COMMAND set CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib - COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" - COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" + COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" + COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" INSTALL_COMMAND COMMAND cmake -E echo "Comparessing taosadapter.exe" @@ -167,8 +167,8 @@ ELSE () PATCH_COMMAND COMMAND git clean -f -d BUILD_COMMAND - COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" - COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" + COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" + COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" INSTALL_COMMAND COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin @@ -192,19 +192,19 @@ ELSE () PATCH_COMMAND COMMAND git clean -f -d BUILD_COMMAND - COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" - COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" + COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" +# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'" INSTALL_COMMAND - COMMAND cmake -E echo "Comparessing taosadapter.exe" - COMMAND upx taosadapter || : +# COMMAND cmake -E echo "Comparessing taosadapter.exe" +# COMMAND upx taosadapter || : COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ - COMMAND cmake -E echo "Copy taosadapter-debug" - COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin +# COMMAND cmake -E echo "Copy taosadapter-debug" +# COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin ) ENDIF () ENDIF ()