diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 438734e191..dafd3aaa4a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1944,6 +1944,16 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // the following procedure consume many CPU resource, result in the re-election of leader // with high probability. So we employ it as a test case for the stream processing framework, with // checkpoint/restart/nodeUpdate etc. + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + while (streamMetaTaskInTimer(pMeta)) { tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index b34b3420fe..441c71662e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { int64_t defaultCfInit; } SBackendWrapper; -void* streamBackendInit(const char* path, int64_t chkpId); +void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b22c6c9b0f..63dc497c6f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -void* streamBackendInit(const char* streamPath, int64_t chkpId) { +void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId); + stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); @@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); + stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); taosMemoryFreeClear(backendPath); return (void*)pHandle; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 17cd9fac57..e6bbd89f02 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -195,10 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->chkpDirLock); pMeta->chkpId = streamGetLatestCheckpointId(pMeta); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); while (pMeta->streamBackend == NULL) { taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); if (pMeta->streamBackend == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } @@ -263,7 +263,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + // todo: not wait in a critical region + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); }