refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-10 16:43:02 +08:00
parent cddfae1cc7
commit f23a8a37bc
5 changed files with 12 additions and 7 deletions

View File

@ -516,7 +516,6 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo; SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo; STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks; int32_t numOfPausedTasks;
int64_t rid; int64_t rid;

View File

@ -30,11 +30,11 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SSnode { struct SSnode {
char* path; char* path;
SStreamMeta* pMeta; SStreamMeta* pMeta;
SMsgCb msgCb; SMsgCb msgCb;
} SSnode; };
#if 0 #if 0
typedef struct { typedef struct {

View File

@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskStartInfo* pStartInfo = &pMeta->startInfo;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
bool scanWal = false;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) { if (pStartInfo->taskStarting == 1) {
@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
pStartInfo->restartCount = 0; pStartInfo->restartCount = 0;
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
} }
scanWal = true;
} }
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
if (scanWal && (vgId != SNODE_HANDLE)) {
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
tqScanWalAsync(pMeta->ahandle, true);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn) { startComplete_fn_t fn) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet);

View File

@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
if (pTask->status.taskStatus == TASK_STATUS__HALT) { if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
// halt it self for count window stream task until the related // halt it self for count window stream task until the related fill history task completed.
// fill history task completd.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);