refactor: do some internal refactor.
This commit is contained in:
parent
fe7864b976
commit
e84eeee6b8
|
@ -321,15 +321,13 @@ typedef struct {
|
||||||
int64_t init;
|
int64_t init;
|
||||||
int64_t step1Start;
|
int64_t step1Start;
|
||||||
int64_t step2Start;
|
int64_t step2Start;
|
||||||
int64_t sinkStart;
|
int64_t execStart;
|
||||||
} STaskTimestamp;
|
int32_t taskUpdateCount;
|
||||||
|
int64_t latestUpdateTs;
|
||||||
|
} STaskExecStatisInfo;
|
||||||
|
|
||||||
typedef struct STokenBucket {
|
typedef struct STokenBucket STokenBucket;
|
||||||
int32_t capacity; // total capacity
|
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||||
int64_t fillTimestamp;// fill timestamp
|
|
||||||
int32_t numOfToken; // total available tokens
|
|
||||||
int32_t rate; // number of token per second
|
|
||||||
} STokenBucket;
|
|
||||||
|
|
||||||
struct SStreamTask {
|
struct SStreamTask {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
|
@ -345,7 +343,7 @@ struct SStreamTask {
|
||||||
SDataRange dataRange;
|
SDataRange dataRange;
|
||||||
SStreamTaskId historyTaskId;
|
SStreamTaskId historyTaskId;
|
||||||
SStreamTaskId streamTaskId;
|
SStreamTaskId streamTaskId;
|
||||||
STaskTimestamp tsInfo;
|
STaskExecStatisInfo taskExecInfo;
|
||||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||||
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||||
SArray* pUpstreamInfoList;
|
SArray* pUpstreamInfoList;
|
||||||
|
@ -359,7 +357,7 @@ struct SStreamTask {
|
||||||
STaskSinkFetch fetchSink;
|
STaskSinkFetch fetchSink;
|
||||||
};
|
};
|
||||||
SSinkTaskRecorder sinkRecorder;
|
SSinkTaskRecorder sinkRecorder;
|
||||||
STokenBucket tokenBucket;
|
STokenBucket* pTokenBucket;
|
||||||
|
|
||||||
void* launchTaskTimer;
|
void* launchTaskTimer;
|
||||||
SMsgCb* pMsgCb; // msg handle
|
SMsgCb* pMsgCb; // msg handle
|
||||||
|
@ -381,19 +379,13 @@ struct SStreamTask {
|
||||||
char reserve[256];
|
char reserve[256];
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMetaHbInfo {
|
|
||||||
tmr_h hbTmr;
|
|
||||||
int32_t stopFlag;
|
|
||||||
int32_t tickCounter;
|
|
||||||
} SMetaHbInfo;
|
|
||||||
|
|
||||||
// meta
|
// meta
|
||||||
typedef struct SStreamMeta {
|
typedef struct SStreamMeta {
|
||||||
char* path;
|
char* path;
|
||||||
TDB* db;
|
TDB* db;
|
||||||
TTB* pTaskDb;
|
TTB* pTaskDb;
|
||||||
TTB* pCheckpointDb;
|
TTB* pCheckpointDb;
|
||||||
SHashObj* pTasks;
|
SHashObj* pTasksMap;
|
||||||
SArray* pTaskList; // SArray<task_id*>
|
SArray* pTaskList; // SArray<task_id*>
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
|
@ -403,15 +395,13 @@ typedef struct SStreamMeta {
|
||||||
bool leader;
|
bool leader;
|
||||||
int8_t taskWillbeLaunched;
|
int8_t taskWillbeLaunched;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
// TdThreadRwlock lock;
|
|
||||||
int32_t walScanCounter;
|
int32_t walScanCounter;
|
||||||
void* streamBackend;
|
void* streamBackend;
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
SHashObj* pTaskBackendUnique;
|
SHashObj* pTaskBackendUnique;
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
SMetaHbInfo hbInfo;
|
SMetaHbInfo* pHbInfo;
|
||||||
SHashObj* pUpdateTaskList;
|
SHashObj* pUpdateTaskSet;
|
||||||
// int32_t closedTask;
|
|
||||||
int32_t totalTasks; // this value should be increased when a new task is added into the meta
|
int32_t totalTasks; // this value should be increased when a new task is added into the meta
|
||||||
int32_t chkptNotReadyTasks;
|
int32_t chkptNotReadyTasks;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
|
|
|
@ -1006,8 +1006,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
|
|
||||||
bool restored = pTq->pVnode->restored;
|
bool restored = pTq->pVnode->restored;
|
||||||
if (p != NULL && restored) {
|
if (p != NULL && restored) {
|
||||||
p->tsInfo.init = taosGetTimestampMs();
|
p->taskExecInfo.init = taosGetTimestampMs();
|
||||||
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init);
|
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->taskExecInfo.init);
|
||||||
|
|
||||||
streamTaskCheckDownstream(p);
|
streamTaskCheckDownstream(p);
|
||||||
} else if (!restored) {
|
} else if (!restored) {
|
||||||
|
@ -1045,14 +1045,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
|
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
|
||||||
|
|
||||||
if (pTask->tsInfo.step1Start == 0) {
|
if (pTask->taskExecInfo.step1Start == 0) {
|
||||||
ASSERT(pTask->status.pauseAllowed == false);
|
ASSERT(pTask->status.pauseAllowed == false);
|
||||||
pTask->tsInfo.step1Start = taosGetTimestampMs();
|
pTask->taskExecInfo.step1Start = taosGetTimestampMs();
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
|
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->taskExecInfo.step1Start);
|
||||||
}
|
}
|
||||||
|
|
||||||
// we have to continue retrying to successfully execute the scan history task.
|
// we have to continue retrying to successfully execute the scan history task.
|
||||||
|
@ -1072,7 +1072,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
streamScanHistoryData(pTask);
|
streamScanHistoryData(pTask);
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||||
int8_t status = streamTaskSetSchedStatusInActive(pTask);
|
int8_t status = streamTaskSetSchedStatusInActive(pTask);
|
||||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
|
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1080,7 +1080,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the following procedure should be executed, no matter status is stop/pause or not
|
// the following procedure should be executed, no matter status is stop/pause or not
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||||
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
|
@ -1128,7 +1128,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
streamTryExec(pTask); // exec directly
|
streamTryExec(pTask); // exec directly
|
||||||
|
@ -1140,7 +1140,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
int64_t dstVer = pTask->dataRange.range.minVer;
|
||||||
|
@ -1344,6 +1344,13 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
|
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
|
||||||
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
|
// commit the update
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1676,9 +1683,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// update the nodeEpset when it exists
|
// update the nodeEpset when it exists
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
// when replay the WAL, we should update the task epset one again and again, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
int64_t keys[2] = {req.streamId, req.taskId};
|
int64_t keys[2] = {req.streamId, req.taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
|
|
||||||
if (ppTask == NULL || *ppTask == NULL) {
|
if (ppTask == NULL || *ppTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
|
@ -1690,8 +1697,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
|
tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
|
|
||||||
|
@ -1700,7 +1707,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
keys[0] = pTask->historyTaskId.streamId;
|
keys[0] = pTask->historyTaskId.streamId;
|
||||||
keys[1] = pTask->historyTaskId.taskId;
|
keys[1] = pTask->historyTaskId.taskId;
|
||||||
|
|
||||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||||
pMeta->vgId, req.taskId);
|
pMeta->vgId, req.taskId);
|
||||||
|
@ -1722,14 +1729,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
|
taosHashPut(pMeta->pUpdateTaskSet, &pTask->id, sizeof(pTask->id), NULL, 0);
|
||||||
|
|
||||||
if (ppHTask != NULL) {
|
if (ppHTask != NULL) {
|
||||||
streamTaskStop(*ppHTask);
|
streamTaskStop(*ppHTask);
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0);
|
|
||||||
if (ppHTask != NULL) {
|
|
||||||
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
||||||
taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
taosHashPut(pMeta->pUpdateTaskSet, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -1738,14 +1743,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// possibly only handle the stream task.
|
// possibly only handle the stream task.
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList);
|
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet);
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
pMeta->taskWillbeLaunched = 1;
|
pMeta->taskWillbeLaunched = 1;
|
||||||
|
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
} else {
|
} else {
|
||||||
taosHashClear(pMeta->pUpdateTaskList);
|
taosHashClear(pMeta->pUpdateTaskSet);
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
||||||
|
|
|
@ -1129,7 +1129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
||||||
|
|
||||||
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
|
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
|
||||||
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0;
|
||||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||||
" submit into dst table, duration:%.2f Sec.",
|
" submit into dst table, duration:%.2f Sec.",
|
||||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
|
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
|
||||||
|
@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if (pTask->tsInfo.sinkStart == 0) {
|
if (pTask->taskExecInfo.execStart == 0) {
|
||||||
pTask->tsInfo.sinkStart = taosGetTimestampMs();
|
pTask->taskExecInfo.execStart = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool onlySubmitData = true;
|
bool onlySubmitData = true;
|
||||||
|
|
|
@ -94,8 +94,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->tsInfo.init = taosGetTimestampMs();
|
pTask->taskExecInfo.init = taosGetTimestampMs();
|
||||||
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.init);
|
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
|
||||||
|
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
streamTaskCheckDownstream(pTask);
|
streamTaskCheckDownstream(pTask);
|
||||||
|
@ -241,7 +241,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key));
|
||||||
|
|
||||||
int8_t status = (*pTask)->status.taskStatus;
|
int8_t status = (*pTask)->status.taskStatus;
|
||||||
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
|
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
|
||||||
|
@ -307,7 +307,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
||||||
", not scan wal anymore, add transfer-state block into inputQ",
|
", not scan wal anymore, add transfer-state block into inputQ",
|
||||||
id, ver, maxVer);
|
id, ver, maxVer);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
/*int32_t code = */streamSchedExec(pTask);
|
/*int32_t code = */streamSchedExec(pTask);
|
||||||
|
|
|
@ -29,17 +29,24 @@ extern "C" {
|
||||||
#define ONE_MB_F (1048576.0)
|
#define ONE_MB_F (1048576.0)
|
||||||
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)
|
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SStreamGlobalEnv {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
void* timer;
|
void* timer;
|
||||||
} SStreamGlobalEnv;
|
} SStreamGlobalEnv;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SStreamContinueExecInfo {
|
||||||
SEpSet epset;
|
SEpSet epset;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
SRpcMsg msg;
|
SRpcMsg msg;
|
||||||
} SStreamContinueExecInfo;
|
} SStreamContinueExecInfo;
|
||||||
|
|
||||||
|
struct STokenBucket {
|
||||||
|
int32_t capacity; // total capacity
|
||||||
|
int64_t fillTimestamp;// fill timestamp
|
||||||
|
int32_t numOfToken; // total available tokens
|
||||||
|
int32_t rate; // number of token per second
|
||||||
|
};
|
||||||
|
|
||||||
extern SStreamGlobalEnv streamEnv;
|
extern SStreamGlobalEnv streamEnv;
|
||||||
extern int32_t streamBackendId;
|
extern int32_t streamBackendId;
|
||||||
extern int32_t streamBackendCfWrapperId;
|
extern int32_t streamBackendCfWrapperId;
|
||||||
|
|
|
@ -270,7 +270,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
keys[0] = pId->streamId;
|
keys[0] = pId->streamId;
|
||||||
keys[1] = pId->taskId;
|
keys[1] = pId->taskId;
|
||||||
|
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppTask == NULL) {
|
if (ppTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,7 +201,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
|
|
||||||
while (!finished) {
|
while (!finished) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||||
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,12 @@ typedef struct {
|
||||||
SHashObj* pTable;
|
SHashObj* pTable;
|
||||||
} SMetaRefMgt;
|
} SMetaRefMgt;
|
||||||
|
|
||||||
|
struct SMetaHbInfo {
|
||||||
|
tmr_h hbTmr;
|
||||||
|
int32_t stopFlag;
|
||||||
|
int32_t tickCounter;
|
||||||
|
};
|
||||||
|
|
||||||
SMetaRefMgt gMetaRefMgt;
|
SMetaRefMgt gMetaRefMgt;
|
||||||
|
|
||||||
void metaRefMgtInit();
|
void metaRefMgtInit();
|
||||||
|
@ -135,13 +141,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
}
|
}
|
||||||
|
|
||||||
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
||||||
if (pMeta->pTasks == NULL) {
|
if (pMeta->pTasksMap == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
pMeta->pUpdateTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||||
if (pMeta->pUpdateTaskList == NULL) {
|
if (pMeta->pUpdateTaskSet == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||||
|
if (pMeta->pHbInfo == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,9 +176,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
|
|
||||||
metaRefMgtAdd(pMeta->vgId, pRid);
|
metaRefMgtAdd(pMeta->vgId, pRid);
|
||||||
|
|
||||||
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
|
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
|
||||||
pMeta->hbInfo.tickCounter = 0;
|
pMeta->pHbInfo->tickCounter = 0;
|
||||||
pMeta->hbInfo.stopFlag = 0;
|
pMeta->pHbInfo->stopFlag = 0;
|
||||||
|
|
||||||
pMeta->pTaskBackendUnique =
|
pMeta->pTaskBackendUnique =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -201,11 +212,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
taosMemoryFree(pMeta->path);
|
taosMemoryFree(pMeta->path);
|
||||||
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
|
||||||
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
||||||
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
||||||
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
||||||
if (pMeta->db) tdbClose(pMeta->db);
|
if (pMeta->db) tdbClose(pMeta->db);
|
||||||
|
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
||||||
|
if (pMeta->pUpdateTaskSet) taosHashCleanup(pMeta->pUpdateTaskSet);
|
||||||
|
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
|
|
||||||
|
@ -259,7 +272,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
void streamMetaClear(SStreamMeta* pMeta) {
|
void streamMetaClear(SStreamMeta* pMeta) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) {
|
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
|
||||||
SStreamTask* p = *(SStreamTask**)pIter;
|
SStreamTask* p = *(SStreamTask**)pIter;
|
||||||
|
|
||||||
// release the ref by timer
|
// release the ref by timer
|
||||||
|
@ -275,7 +288,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
|
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
|
||||||
|
|
||||||
taosHashClear(pMeta->pTasks);
|
taosHashClear(pMeta->pTasksMap);
|
||||||
taosHashClear(pMeta->pTaskBackendUnique);
|
taosHashClear(pMeta->pTaskBackendUnique);
|
||||||
|
|
||||||
taosArrayClear(pMeta->pTaskList);
|
taosArrayClear(pMeta->pTaskList);
|
||||||
|
@ -316,9 +329,9 @@ void streamMetaCloseImpl(void* arg) {
|
||||||
taosArrayDestroy(pMeta->chkpSaved);
|
taosArrayDestroy(pMeta->chkpSaved);
|
||||||
taosArrayDestroy(pMeta->chkpInUse);
|
taosArrayDestroy(pMeta->chkpInUse);
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pTasks);
|
taosHashCleanup(pMeta->pTasksMap);
|
||||||
taosHashCleanup(pMeta->pTaskBackendUnique);
|
taosHashCleanup(pMeta->pTaskBackendUnique);
|
||||||
taosHashCleanup(pMeta->pUpdateTaskList);
|
taosHashCleanup(pMeta->pUpdateTaskSet);
|
||||||
|
|
||||||
taosMemoryFree(pMeta->path);
|
taosMemoryFree(pMeta->path);
|
||||||
taosThreadMutexDestroy(&pMeta->backendMutex);
|
taosThreadMutexDestroy(&pMeta->backendMutex);
|
||||||
|
@ -380,7 +393,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
*pAdded = false;
|
*pAdded = false;
|
||||||
|
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
@ -402,14 +415,14 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES);
|
||||||
*pAdded = true;
|
*pAdded = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
||||||
size_t size = taosHashGetSize(pMeta->pTasks);
|
size_t size = taosHashGetSize(pMeta->pTasksMap);
|
||||||
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
|
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasksMap));
|
||||||
return (int32_t)size;
|
return (int32_t)size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,7 +433,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
|
||||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
int64_t keys[2] = {pId->streamId, pId->taskId};
|
||||||
|
|
||||||
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -437,7 +450,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int64_t keys[2] = {streamId, taskId};
|
int64_t keys[2] = {streamId, taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
|
@ -481,7 +494,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int64_t keys[2] = {streamId, taskId};
|
int64_t keys[2] = {streamId, taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
|
@ -501,7 +514,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
|
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
|
@ -520,9 +533,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
taosHashRemove(pMeta->pTasks, keys, sizeof(keys));
|
taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
|
||||||
ASSERT(pTask->status.timerActive == 0);
|
ASSERT(pTask->status.timerActive == 0);
|
||||||
|
@ -674,7 +687,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
// do duplicate task check.
|
// do duplicate task check.
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
||||||
// In this case, we try not to start fill-history task anymore.
|
// In this case, we try not to start fill-history task anymore.
|
||||||
|
@ -692,7 +705,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
|
if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
|
||||||
doClear(pKey, pVal, pCur, pRecycleList);
|
doClear(pKey, pVal, pCur, pRecycleList);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -779,8 +792,8 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to stop, stop now
|
// need to stop, stop now
|
||||||
if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) {
|
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
|
||||||
pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP;
|
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
||||||
qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
return;
|
return;
|
||||||
|
@ -793,8 +806,8 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!enoughTimeDuration(&pMeta->hbInfo)) {
|
if (!enoughTimeDuration(pMeta->pHbInfo)) {
|
||||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -813,7 +826,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
int64_t keys[2] = {pId->streamId, pId->taskId};
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
|
|
||||||
if ((*pTask)->info.fillHistory == 1) {
|
if ((*pTask)->info.fillHistory == 1) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -873,7 +886,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -884,7 +897,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -907,7 +920,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -921,8 +934,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->leader) {
|
if (pMeta->leader) {
|
||||||
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
|
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
|
||||||
while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
|
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,9 +190,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STokenBucket* pBucket = &pTask->tokenBucket;
|
STokenBucket* pBucket = pTask->pTokenBucket;
|
||||||
bool has = streamTaskHasAvailableToken(pBucket);
|
if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution
|
||||||
if (!has) { // no available token in th bucket, ignore this execution
|
|
||||||
// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
|
// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
|
||||||
// pBucket->capacity, pBucket->rate);
|
// pBucket->capacity, pBucket->rate);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
int64_t el = (taosGetTimestampMs() - pTask->taskExecInfo.init);
|
||||||
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s",
|
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s",
|
||||||
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
}
|
}
|
||||||
|
@ -525,7 +525,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
|
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive >= 1);
|
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||||
|
|
||||||
|
@ -590,7 +590,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
|
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
|
||||||
|
|
||||||
// Set the execute conditions, including the query time window and the version range
|
// Set the execute conditions, including the query time window and the version range
|
||||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||||
if (pHTask == NULL) {
|
if (pHTask == NULL) {
|
||||||
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
||||||
pMeta->vgId, hTaskId);
|
pMeta->vgId, hTaskId);
|
||||||
|
|
|
@ -355,6 +355,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
pTask->pUpstreamInfoList = NULL;
|
pTask->pUpstreamInfoList = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pTask->pTokenBucket);
|
||||||
taosThreadMutexDestroy(&pTask->lock);
|
taosThreadMutexDestroy(&pTask->lock);
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
|
|
||||||
|
@ -371,10 +372,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
|
|
||||||
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
|
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
|
||||||
qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
|
qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
|
||||||
return -1;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->tsInfo.created = taosGetTimestampMs();
|
pTask->taskExecInfo.created = taosGetTimestampMs();
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMeta = pMeta;
|
pTask->pMeta = pMeta;
|
||||||
|
@ -384,19 +385,25 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
pTask->dataRange.range.minVer = ver;
|
pTask->dataRange.range.minVer = ver;
|
||||||
pTask->pMsgCb = pMsgCb;
|
pTask->pMsgCb = pMsgCb;
|
||||||
|
|
||||||
streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50);
|
pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
|
||||||
|
if (pTask->pTokenBucket == NULL) {
|
||||||
TdThreadMutexAttr attr = {0};
|
qError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
int ret = taosThreadMutexAttrInit(&attr);
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
if (ret != 0) {
|
|
||||||
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret));
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
|
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
|
||||||
if (ret != 0) {
|
|
||||||
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret));
|
TdThreadMutexAttr attr = {0};
|
||||||
return ret;
|
int code = taosThreadMutexAttrInit(&attr);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexInit(&pTask->lock, &attr);
|
taosThreadMutexInit(&pTask->lock, &attr);
|
||||||
|
@ -517,7 +524,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->tsInfo.init = 0;
|
pTask->taskExecInfo.init = 0;
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el);
|
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -547,10 +554,18 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||||
|
STaskExecStatisInfo* p = &pTask->taskExecInfo;
|
||||||
|
qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr,
|
||||||
|
p->taskUpdateCount + 1, p->latestUpdateTs);
|
||||||
|
|
||||||
|
p->taskUpdateCount += 1;
|
||||||
|
p->latestUpdateTs = taosGetTimestampMs();
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||||
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||||
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
|
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue