Merge pull request #22913 from taosdata/fix/liaohj

fix(stream): fix error in drop task.
This commit is contained in:
Haojun Liao 2023-09-16 21:36:38 +08:00 committed by GitHub
commit caa7542cea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 212 additions and 137 deletions

View File

@ -321,15 +321,13 @@ typedef struct {
int64_t init;
int64_t step1Start;
int64_t step2Start;
int64_t sinkStart;
} STaskTimestamp;
int64_t start;
int32_t updateCount;
int64_t latestUpdateTs;
} STaskExecStatisInfo;
typedef struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
} STokenBucket;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
struct SStreamTask {
int64_t ver;
@ -345,7 +343,7 @@ struct SStreamTask {
SDataRange dataRange;
SStreamTaskId historyTaskId;
SStreamTaskId streamTaskId;
STaskTimestamp tsInfo;
STaskExecStatisInfo taskExecInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
SArray* pUpstreamInfoList;
@ -359,7 +357,7 @@ struct SStreamTask {
STaskSinkFetch fetchSink;
};
SSinkTaskRecorder sinkRecorder;
STokenBucket tokenBucket;
STokenBucket* pTokenBucket;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
@ -381,19 +379,13 @@ struct SStreamTask {
char reserve[256];
};
typedef struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
} SMetaHbInfo;
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pTasksMap;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
@ -403,15 +395,13 @@ typedef struct SStreamMeta {
bool leader;
int8_t taskWillbeLaunched;
SRWLatch lock;
// TdThreadRwlock lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
SMetaHbInfo hbInfo;
SHashObj* pUpdateTaskList;
// int32_t closedTask;
SMetaHbInfo* pHbInfo;
SHashObj* pUpdateTaskSet;
int32_t totalTasks; // this value should be increased when a new task is added into the meta
int32_t chkptNotReadyTasks;
int64_t rid;
@ -732,7 +722,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,

View File

@ -993,8 +993,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored;
if (p != NULL && restored) {
p->tsInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init);
p->taskExecInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->taskExecInfo.init);
streamTaskCheckDownstream(p);
} else if (!restored) {
@ -1032,14 +1032,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
if (pTask->tsInfo.step1Start == 0) {
if (pTask->taskExecInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
pTask->taskExecInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->taskExecInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task.
@ -1059,7 +1059,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
streamMetaReleaseTask(pMeta, pTask);
@ -1067,7 +1067,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
@ -1084,7 +1084,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
@ -1115,7 +1116,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamTryExec(pTask); // exec directly
@ -1127,7 +1128,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->tsInfo.step2Start = taosGetTimestampMs();
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
@ -1330,14 +1331,18 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0;
}
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
// commit the update
taosWLockLatch(&pTq->pStreamMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks);
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0;
}
@ -1669,9 +1674,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// update the nodeEpset when it exists
taosWLockLatch(&pMeta->lock);
// when replay the WAL, we should update the task epset one again and again, the task may be in stop status.
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
int64_t keys[2] = {req.streamId, req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
@ -1683,8 +1688,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
SStreamTask* pTask = *ppTask;
tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr);
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamSetStatusNormal(pTask);
@ -1693,7 +1698,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
keys[0] = pTask->historyTaskId.streamId;
keys[1] = pTask->historyTaskId.taskId;
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
@ -1715,14 +1720,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamTaskStop(pTask);
taosHashPut(pMeta->pUpdateTaskSet, &pTask->id, sizeof(pTask->id), NULL, 0);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
}
taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0);
if (ppHTask != NULL) {
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
taosHashPut(pMeta->pUpdateTaskSet, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
}
@ -1731,14 +1734,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList);
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet);
if (updateTasks < numOfTasks) {
pMeta->taskWillbeLaunched = 1;
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock);
} else {
taosHashClear(pMeta->pUpdateTaskList);
taosHashClear(pMeta->pUpdateTaskSet);
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);

View File

@ -1129,7 +1129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
if (pIter == NULL) {
break;
}

View File

@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t code = TSDB_CODE_SUCCESS;
const char* id = pTask->id.idStr;
if (pTask->tsInfo.sinkStart == 0) {
pTask->tsInfo.sinkStart = taosGetTimestampMs();
if (pTask->taskExecInfo.start == 0) {
pTask->taskExecInfo.start = taosGetTimestampMs();
}
bool onlySubmitData = true;

View File

@ -94,8 +94,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
continue;
}
pTask->tsInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.init);
pTask->taskExecInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
@ -241,7 +241,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key));
int8_t status = (*pTask)->status.taskStatus;
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
@ -307,7 +307,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */streamSchedExec(pTask);

View File

@ -29,17 +29,24 @@ extern "C" {
#define ONE_MB_F (1048576.0)
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)
typedef struct {
typedef struct SStreamGlobalEnv {
int8_t inited;
void* timer;
} SStreamGlobalEnv;
typedef struct {
typedef struct SStreamContinueExecInfo {
SEpSet epset;
int32_t taskId;
SRpcMsg msg;
} SStreamContinueExecInfo;
struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
};
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;

View File

@ -270,7 +270,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
keys[0] = pId->streamId;
keys[1] = pId->taskId;
SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask == NULL) {
continue;
}

View File

@ -201,7 +201,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
while (!finished) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
break;
}
@ -303,7 +303,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTask->id.idStr, pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 2. save to disk
taosWLockLatch(&pMeta->lock);
@ -365,7 +366,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 4. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
@ -408,6 +410,8 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task
streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id);
}
return code;
@ -503,7 +507,6 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
}
} else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock);
if (level != TASK_LEVEL__SINK) {
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
@ -511,9 +514,6 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInActive(pTask);
}
} else {
qDebug("s-task:%s sink task does not transfer state", id);
}
}
return code;

View File

@ -43,6 +43,14 @@ typedef struct {
SHashObj* pTable;
} SMetaRefMgt;
struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
};
SMetaRefMgt gMetaRefMgt;
void metaRefMgtInit();
@ -134,13 +142,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasksMap == NULL) {
goto _err;
}
pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->pUpdateTaskList == NULL) {
pMeta->pUpdateTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->pUpdateTaskSet == NULL) {
goto _err;
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pMeta->pHbInfo == NULL) {
goto _err;
}
@ -164,9 +177,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0;
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -200,11 +213,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
_err:
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->pUpdateTaskSet) taosHashCleanup(pMeta->pUpdateTaskSet);
taosMemoryFree(pMeta);
@ -258,7 +273,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
void streamMetaClear(SStreamMeta* pMeta) {
void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) {
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
SStreamTask* p = *(SStreamTask**)pIter;
// release the ref by timer
@ -274,7 +289,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasks);
taosHashClear(pMeta->pTasksMap);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->pTaskList);
@ -315,10 +330,11 @@ void streamMetaCloseImpl(void* arg) {
taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->chkpInUse);
taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pUpdateTaskList);
taosHashCleanup(pMeta->pUpdateTaskSet);
taosMemoryFree(pMeta->pHbInfo);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
@ -379,7 +395,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
*pAdded = false;
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
@ -401,14 +417,14 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return 0;
}
taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES);
taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES);
*pAdded = true;
return 0;
}
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
size_t size = taosHashGetSize(pMeta->pTasks);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
size_t size = taosHashGetSize(pMeta->pTasksMap);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasksMap));
return (int32_t)size;
}
@ -419,7 +435,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (p == NULL) {
continue;
}
@ -436,7 +452,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
taosRLockLatch(&pMeta->lock);
int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
@ -480,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWLockLatch(&pMeta->lock);
int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask) {
pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) {
@ -500,7 +516,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
while (1) {
taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
@ -519,9 +535,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// let's do delete of stream task
taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask) {
taosHashRemove(pMeta->pTasks, keys, sizeof(keys));
taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0);
@ -673,7 +689,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
// do duplicate task check.
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (p == NULL) {
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
// In this case, we try not to start fill-history task anymore.
@ -691,7 +707,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
return -1;
@ -771,15 +787,14 @@ static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
return;
}
// need to stop, stop now
if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) {
pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP;
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
@ -789,31 +804,37 @@ void metaHbToMnode(void* param, void* tmrId) {
if (!pMeta->leader) {
qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
if (!enoughTimeDuration(&pMeta->hbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!enoughTimeDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
qInfo("vgId:%d start hb", pMeta->vgId);
qDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader);
SStreamHbMsg hbMsg = {0};
taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasValEpset = false;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if ((*pTask)->info.fillHistory == 1) {
continue;
}
@ -865,14 +886,17 @@ void metaHbToMnode(void* param, void* tmrId) {
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
pMeta->pHbInfo->hbCount += 1;
qDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
qError("vgId:%d no mnd epset", pMeta->vgId);
qDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
taosArrayDestroy(hbMsg.pTaskStatus);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
@ -883,7 +907,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
if (pIter == NULL) {
break;
}
@ -901,12 +925,14 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
qDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
if (pIter == NULL) {
break;
}
@ -920,8 +946,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->leader) {
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}

View File

@ -190,9 +190,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
STokenBucket* pBucket = &pTask->tokenBucket;
bool has = streamTaskHasAvailableToken(pBucket);
if (!has) { // no available token in th bucket, ignore this execution
STokenBucket* pBucket = pTask->pTokenBucket;
if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution
// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
// pBucket->capacity, pBucket->rate);
return TSDB_CODE_SUCCESS;

View File

@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1;
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
int64_t el = (taosGetTimestampMs() - pTask->taskExecInfo.init);
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s",
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
}
@ -525,7 +525,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
taosWLockLatch(&pMeta->lock);
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (ppTask) {
ASSERT((*ppTask)->status.timerActive >= 1);
@ -590,7 +590,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, hTaskId);

View File

@ -277,7 +277,20 @@ static void freeUpstreamItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) {
int32_t taskId = pTask->id.taskId;
qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
STaskExecStatisInfo* pStatis = &pTask->taskExecInfo;
qDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
streamGetTaskStatusStr(pTask->status.taskStatus));
qDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
" nextProcessVer:%" PRId64,
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer);
if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) {
int32_t k = 1;
}
// remove the ref by timer
while (pTask->status.timerActive > 0) {
@ -355,6 +368,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pUpstreamInfoList = NULL;
}
taosMemoryFree(pTask->pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
@ -371,10 +385,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->tsInfo.created = taosGetTimestampMs();
pTask->taskExecInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
@ -384,19 +398,25 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50);
TdThreadMutexAttr attr = {0};
int ret = taosThreadMutexAttrInit(&attr);
if (ret != 0) {
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->pTokenBucket == NULL) {
qError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
if (ret != 0) {
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);
if (code != 0) {
qError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
if (code != 0) {
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
taosThreadMutexInit(&pTask->lock, &attr);
@ -458,7 +478,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId,
pInfo->taskId, buf);
break;
}
}
@ -516,9 +537,8 @@ int32_t streamTaskStop(SStreamTask* pTask) {
taosMsleep(100);
}
pTask->tsInfo.init = 0;
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el);
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
return 0;
}
@ -546,10 +566,18 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
}
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
STaskExecStatisInfo* p = &pTask->taskExecInfo;
qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr,
p->updateCount + 1, p->latestUpdateTs);
p->updateCount += 1;
p->latestUpdateTs = taosGetTimestampMs();
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
}
return 0;
}
@ -599,3 +627,25 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
return status;
}
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code;
}
qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
return code;
}