fix stream case error

This commit is contained in:
yihaoDeng 2023-10-25 19:38:22 +08:00
parent d43b3b4a32
commit db0f6258fe
6 changed files with 29 additions and 47 deletions

View File

@ -774,7 +774,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char *key);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
@ -794,4 +794,4 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
}
#endif
#endif /* ifndef _STREAM_H_ */
#endif /* ifndef _STREAM_H_ */

View File

@ -256,9 +256,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosMemoryFree(s);
}
SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value
task.pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1);
//SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
pTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, pTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED;
@ -1429,4 +1431,4 @@ _exit:
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
}

View File

@ -749,14 +749,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pStateTask = pTask;
// SStreamTask task = {0};
// if (pTask->info.fillHistory) {
// task.id.streamId = pTask->streamTaskId.streamId;
// task.id.taskId = pTask->streamTaskId.taskId;
// task.pMeta = pTask->pMeta;
// pStateTask = &task;
// }
// if (pTask->info.fillHistory) {
// pTask->id.streamId = pTask->streamTaskId.streamId;
// pTask->id.taskId = pTask->streamTaskId.taskId;
// }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
@ -786,10 +784,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
SStreamTask* pSateTask = pTask;
// SStreamTask task = {0};
// if (pTask->info.fillHistory) {
// task.id.streamId = pTask->streamTaskId.streamId;
// task.id.taskId = pTask->streamTaskId.taskId;
// task.pMeta = pTask->pMeta;
// pSateTask = &task;
// pTask->id.streamId = pTask->streamTaskId.streamId;
// pTask->id.taskId = pTask->streamTaskId.taskId;
// }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -1984,4 +1980,4 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
}

View File

@ -226,10 +226,10 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0;
}
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg) {
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) {
SStreamTask* pTask = arg;
char* key = (char*)pTask->id.idStr;
//char* key = (char*)pTask->id.idStr;
int64_t chkpId = pTask->checkpointingId;
taosThreadMutexLock(&pMeta->backendMutex);

View File

@ -110,31 +110,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
pState->streamId = pStreamTask->id.streamId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
// pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
// taosThreadMutexLock(&pMeta->backendMutex);
// void* uniqueId =
// taosHashGet(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
// if (uniqueId == NULL) {
// int code = streamStateOpenBackend(pMeta->streamBackend, pState);
// if (code == -1) {
// taosThreadMutexUnlock(&pMeta->backendMutex);
// taosMemoryFree(pState);
// return NULL;
// }
// taosHashPut(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
// &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
// } else {
// int64_t id = *(int64_t*)uniqueId;
// pState->pTdbState->backendCfWrapperId = id;
// pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// // already exist stream task for
// qInfo("already exist stream-state for %s", pState->pTdbState->idstr);
// // taosAcquireRef(streamBackendId, pState->streamBackendRid);
// }
// taosThreadMutexUnlock(&pMeta->backendMutex);
pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL;
@ -1219,4 +1198,4 @@ char* streamStateIntervalDump(SStreamState* pState) {
streamStateFreeCur(pCur);
return dumpBuf;
}
#endif
#endif

View File

@ -443,9 +443,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
taosThreadMutexInit(&pTask->lock, &attr);
if (streamTaskSetDb(pMeta, pTask) != 0) {
return -1;
}
// if (pTask->info.fillHistory == 1) {
// //
// } else {
// }
// if (streamTaskSetDb(pMeta, pTask) != 0) {
// return -1;
// }
streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS;