fix(stream): disable concurrently restart stream tasks.

This commit is contained in:
Haojun Liao 2023-11-10 09:24:08 +08:00
parent 4d9b422874
commit af08a189c1
4 changed files with 18 additions and 7 deletions

View File

@ -1944,6 +1944,16 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// the following procedure consume many CPU resource, result in the re-election of leader // the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with // with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc. // checkpoint/restart/nodeUpdate etc.
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) { while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100); taosMsleep(100);

View File

@ -44,7 +44,7 @@ typedef struct {
int64_t defaultCfInit; int64_t defaultCfInit;
} SBackendWrapper; } SBackendWrapper;
void* streamBackendInit(const char* path, int64_t chkpId); void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg); void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendLoadCheckpointInfo(void* pMeta);

View File

@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0; return 0;
} }
void* streamBackendInit(const char* streamPath, int64_t chkpId) { void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
char* backendPath = NULL; char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId); stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId);
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
if (cfs != NULL) { if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf); rocksdb_list_column_families_destroy(cfs, nCf);
} }
stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
taosMemoryFreeClear(backendPath); taosMemoryFreeClear(backendPath);
return (void*)pHandle; return (void*)pHandle;

View File

@ -195,10 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->chkpDirLock); taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
while (pMeta->streamBackend == NULL) { while (pMeta->streamBackend == NULL) {
taosMsleep(100); taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
} }
@ -263,7 +263,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
} }
} }
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { // todo: not wait in a critical region
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100); taosMsleep(100);
} }