Merge pull request #22944 from taosdata/fix/liaohj
refactor: do some internal refactor, and fix some bugs.
This commit is contained in:
commit
8aa9960760
|
@ -106,6 +106,7 @@ typedef struct {
|
||||||
} SStreamQueueItem;
|
} SStreamQueueItem;
|
||||||
|
|
||||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
|
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
|
||||||
|
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -154,8 +155,6 @@ typedef struct {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
} SStreamQueueRes;
|
} SStreamQueueRes;
|
||||||
|
|
||||||
void streamFreeQitem(SStreamQueueItem* data);
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
|
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
|
||||||
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
|
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
|
||||||
|
@ -185,12 +184,6 @@ typedef struct {
|
||||||
int32_t streamInit();
|
int32_t streamInit();
|
||||||
void streamCleanUp();
|
void streamCleanUp();
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
|
||||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
|
||||||
void streamQueueProcessFail(SStreamQueue* queue);
|
|
||||||
void* streamQueueNextItem(SStreamQueue* pQueue);
|
|
||||||
|
|
||||||
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
|
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
|
||||||
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
|
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
|
||||||
|
|
||||||
|
@ -222,8 +215,6 @@ typedef struct {
|
||||||
SSHashObj* pTblInfo;
|
SSHashObj* pTblInfo;
|
||||||
} STaskSinkTb;
|
} STaskSinkTb;
|
||||||
|
|
||||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t smaId;
|
int64_t smaId;
|
||||||
// following are not applicable to encoder and decoder
|
// following are not applicable to encoder and decoder
|
||||||
|
@ -244,10 +235,10 @@ typedef struct SStreamChildEpInfo {
|
||||||
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
||||||
} SStreamChildEpInfo;
|
} SStreamChildEpInfo;
|
||||||
|
|
||||||
typedef struct SStreamTaskKey {
|
typedef struct STaskId {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int64_t taskId;
|
||||||
} SStreamTaskKey;
|
} STaskId;
|
||||||
|
|
||||||
typedef struct SStreamTaskId {
|
typedef struct SStreamTaskId {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -341,8 +332,8 @@ struct SStreamTask {
|
||||||
SCheckpointInfo chkInfo;
|
SCheckpointInfo chkInfo;
|
||||||
STaskExec exec;
|
STaskExec exec;
|
||||||
SDataRange dataRange;
|
SDataRange dataRange;
|
||||||
SStreamTaskId historyTaskId;
|
STaskId historyTaskId;
|
||||||
SStreamTaskId streamTaskId;
|
STaskId streamTaskId;
|
||||||
STaskExecStatisInfo taskExecInfo;
|
STaskExecStatisInfo taskExecInfo;
|
||||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||||
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||||
|
@ -386,7 +377,7 @@ typedef struct SStreamMeta {
|
||||||
TTB* pTaskDb;
|
TTB* pTaskDb;
|
||||||
TTB* pCheckpointDb;
|
TTB* pCheckpointDb;
|
||||||
SHashObj* pTasksMap;
|
SHashObj* pTasksMap;
|
||||||
SArray* pTaskList; // SArray<task_id*>
|
SArray* pTaskList; // SArray<STaskId*>
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
|
@ -402,7 +393,8 @@ typedef struct SStreamMeta {
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
SMetaHbInfo* pHbInfo;
|
SMetaHbInfo* pHbInfo;
|
||||||
SHashObj* pUpdateTaskSet;
|
SHashObj* pUpdateTaskSet;
|
||||||
int32_t totalTasks; // this value should be increased when a new task is added into the meta
|
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
||||||
|
int32_t numOfPausedTasks;
|
||||||
int32_t chkptNotReadyTasks;
|
int32_t chkptNotReadyTasks;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
|
|
||||||
|
@ -411,7 +403,6 @@ typedef struct SStreamMeta {
|
||||||
SArray* chkpInUse;
|
SArray* chkpInUse;
|
||||||
int32_t chkpCap;
|
int32_t chkpCap;
|
||||||
SRWLatch chkpDirLock;
|
SRWLatch chkpDirLock;
|
||||||
int32_t pauseTaskNum;
|
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
|
@ -425,7 +416,7 @@ void tFreeStreamTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
||||||
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);
|
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);
|
||||||
|
|
||||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
||||||
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
|
@ -516,7 +507,7 @@ typedef struct {
|
||||||
int32_t downstreamTaskId;
|
int32_t downstreamTaskId;
|
||||||
int32_t upstreamNodeId;
|
int32_t upstreamNodeId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
} SStreamScanHistoryFinishReq, SStreamTransferReq;
|
} SStreamScanHistoryFinishReq;
|
||||||
|
|
||||||
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
|
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
|
||||||
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
|
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
|
||||||
|
@ -562,8 +553,7 @@ int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpo
|
||||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||||
|
|
||||||
typedef struct STaskStatusEntry {
|
typedef struct STaskStatusEntry {
|
||||||
int64_t streamId;
|
STaskId id;
|
||||||
int32_t taskId;
|
|
||||||
int32_t status;
|
int32_t status;
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
|
@ -704,7 +694,7 @@ void streamMetaCleanup();
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage);
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage);
|
||||||
void streamMetaClose(SStreamMeta* streamMeta);
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
// internal
|
// internal
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointTickInterval = 30;
|
int32_t tsStreamCheckpointTickInterval = 300;
|
||||||
int32_t tsStreamNodeCheckInterval = 10;
|
int32_t tsStreamNodeCheckInterval = 10;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
int32_t tsTtlPushIntervalSec = 10;
|
int32_t tsTtlPushIntervalSec = 10;
|
||||||
|
|
|
@ -799,17 +799,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
|
|
||||||
// if (pDb->cfg.replications != 1) {
|
|
||||||
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
|
|
||||||
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
|
|
||||||
// mndReleaseDb(pMnode, pDb);
|
|
||||||
// pDb = NULL;
|
|
||||||
// goto _OVER;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// mndReleaseDb(pMnode, pDb);
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
|
@ -1194,7 +1183,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
|
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
|
||||||
if (p->status != TASK_STATUS__NORMAL) {
|
if (p->status != TASK_STATUS__NORMAL) {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
|
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
|
||||||
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
|
p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status));
|
||||||
ready = false;
|
ready = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1564,29 +1553,17 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
|
|
||||||
// status
|
// status
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
|
|
||||||
if (taskStatus == TASK_STATUS__NORMAL) {
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
memcpy(varDataVal(status), "normal", 6);
|
int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
|
||||||
varDataSetLen(status, 6);
|
if (index == NULL) {
|
||||||
} else if (taskStatus == TASK_STATUS__DROPPING) {
|
continue;
|
||||||
memcpy(varDataVal(status), "dropping", 8);
|
|
||||||
varDataSetLen(status, 8);
|
|
||||||
} else if (taskStatus == TASK_STATUS__UNINIT) {
|
|
||||||
memcpy(varDataVal(status), "uninit", 6);
|
|
||||||
varDataSetLen(status, 4);
|
|
||||||
} else if (taskStatus == TASK_STATUS__STOP) {
|
|
||||||
memcpy(varDataVal(status), "stop", 4);
|
|
||||||
varDataSetLen(status, 4);
|
|
||||||
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
|
||||||
memcpy(varDataVal(status), "history", 7);
|
|
||||||
varDataSetLen(status, 7);
|
|
||||||
} else if (taskStatus == TASK_STATUS__HALT) {
|
|
||||||
memcpy(varDataVal(status), "halt", 4);
|
|
||||||
varDataSetLen(status, 4);
|
|
||||||
} else if (taskStatus == TASK_STATUS__PAUSE) {
|
|
||||||
memcpy(varDataVal(status), "pause", 5);
|
|
||||||
varDataSetLen(status, 5);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
|
||||||
|
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
|
||||||
|
STR_TO_VARSTR(status, pStatus);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
|
||||||
|
|
||||||
|
@ -2269,16 +2246,16 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
|
||||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||||
for (int32_t j = 0; j < numOfTasks; j++) {
|
for (int32_t j = 0; j < numOfTasks; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
|
||||||
|
|
||||||
void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
|
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
STaskStatusEntry entry = {
|
STaskStatusEntry entry = {
|
||||||
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
|
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
|
||||||
taosArrayPush(pExecNode->pTaskList, &entry);
|
taosArrayPush(pExecNode->pTaskList, &entry);
|
||||||
|
|
||||||
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
|
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
|
||||||
taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal));
|
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2311,8 +2288,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
int64_t k[2] = {p->streamId, p->taskId};
|
int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
|
||||||
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
|
||||||
if (index == NULL) {
|
if (index == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2320,7 +2296,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
|
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
|
||||||
pStatusEntry->status = p->status;
|
pStatusEntry->status = p->status;
|
||||||
if (p->status != TASK_STATUS__NORMAL) {
|
if (p->status != TASK_STATUS__NORMAL) {
|
||||||
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
|
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execNodeList.lock);
|
||||||
|
|
|
@ -189,15 +189,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||||
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
|
||||||
|
// commit the update
|
||||||
|
taosWLockLatch(&pSnode->pMeta->lock);
|
||||||
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
|
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
|
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pSnode->pMeta->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -259,7 +259,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
|
|
|
@ -726,7 +726,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
SStreamTask* pStateTask = pTask;
|
SStreamTask* pStateTask = pTask;
|
||||||
SStreamTask task = {0};
|
SStreamTask task = {0};
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
task.id = pTask->streamTaskId;
|
task.id.streamId = pTask->streamTaskId.streamId;
|
||||||
|
task.id.taskId = pTask->streamTaskId.taskId;
|
||||||
task.pMeta = pTask->pMeta;
|
task.pMeta = pTask->pMeta;
|
||||||
pStateTask = &task;
|
pStateTask = &task;
|
||||||
}
|
}
|
||||||
|
@ -760,7 +761,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
SStreamTask* pSateTask = pTask;
|
SStreamTask* pSateTask = pTask;
|
||||||
SStreamTask task = {0};
|
SStreamTask task = {0};
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
task.id = pTask->streamTaskId;
|
task.id.streamId = pTask->streamTaskId.streamId;
|
||||||
|
task.id.taskId = pTask->streamTaskId.taskId;
|
||||||
task.pMeta = pTask->pMeta;
|
task.pMeta = pTask->pMeta;
|
||||||
pSateTask = &task;
|
pSateTask = &task;
|
||||||
}
|
}
|
||||||
|
@ -845,14 +847,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam);
|
pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
" nextProcessVer:%" PRId64
|
" nextProcessVer:%" PRId64
|
||||||
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam);
|
pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1079,7 +1081,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo delete this task, if the related stream task is dropped
|
// todo delete this task, if the related stream task is dropped
|
||||||
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
|
qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
|
@ -1365,7 +1367,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||||
|
", it may have been dropped already",
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
@ -1545,8 +1548,6 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
|
|
||||||
|
|
||||||
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
|
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
|
||||||
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
@ -1596,11 +1597,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// set the initial value for generating check point
|
// set the initial value for generating check point
|
||||||
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
||||||
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
total = taosArrayGetSize(pMeta->pTaskList);
|
total = pMeta->numOfStreamTasks;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",
|
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",
|
||||||
|
@ -1675,9 +1675,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
int64_t keys[2] = {req.streamId, req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
|
||||||
if (ppTask == NULL || *ppTask == NULL) {
|
if (ppTask == NULL || *ppTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
req.taskId);
|
req.taskId);
|
||||||
|
@ -1695,10 +1694,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
keys[0] = pTask->historyTaskId.streamId;
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId));
|
||||||
keys[1] = pTask->historyTaskId.taskId;
|
|
||||||
|
|
||||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
|
||||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||||
pMeta->vgId, req.taskId);
|
pMeta->vgId, req.taskId);
|
||||||
|
|
|
@ -166,7 +166,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum;
|
int32_t numOfPauseTasks = pTq->pStreamMeta->numOfPausedTasks;
|
||||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
if (ckPause && numOfTasks == numOfPauseTasks) {
|
||||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
||||||
|
|
||||||
|
@ -240,8 +240,8 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
|
||||||
int8_t status = (*pTask)->status.taskStatus;
|
int8_t status = (*pTask)->status.taskStatus;
|
||||||
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
|
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
|
||||||
|
|
|
@ -227,23 +227,20 @@ _err:
|
||||||
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
STqHandle handle;
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
if (pHdr->type == SNAP_DATA_STREAM_TASK) {
|
if (pHdr->type == SNAP_DATA_STREAM_TASK) {
|
||||||
SStreamTaskId task = {0};
|
STaskId taskId = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
code = tDecodeStreamTaskId(&decoder, &taskId);
|
||||||
code = tDecodeStreamTaskId(&decoder, &task);
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
|
|
||||||
int64_t key[2] = {task.streamId, task.taskId};
|
|
||||||
|
|
||||||
|
int64_t key[2] = {taskId.streamId, taskId.taskId};
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
||||||
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
|
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
|
||||||
|
|
|
@ -86,6 +86,15 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate);
|
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate);
|
||||||
|
|
||||||
|
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||||
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||||
|
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||||
|
void streamQueueProcessFail(SStreamQueue* queue);
|
||||||
|
void* streamQueueNextItem(SStreamQueue* pQueue);
|
||||||
|
void streamFreeQitem(SStreamQueueItem* data);
|
||||||
|
|
||||||
|
STaskId extractStreamTaskKey(const SStreamTask* pTask);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -182,8 +182,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
||||||
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
@ -266,11 +265,8 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
|
|
||||||
int64_t keys[2];
|
int64_t keys[2];
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
keys[0] = pId->streamId;
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
keys[1] = pId->taskId;
|
|
||||||
|
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
|
||||||
if (ppTask == NULL) {
|
if (ppTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -318,15 +314,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (remain == 0) { // all tasks are ready
|
if (remain == 0) { // all tasks are ready
|
||||||
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
||||||
pMeta->totalTasks = 0;
|
|
||||||
|
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
||||||
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
|
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
|
||||||
pTask->checkpointingId);
|
pTask->checkpointingId);
|
||||||
} else {
|
} else {
|
||||||
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
|
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
|
||||||
pTask->id.idStr, remain, pMeta->totalTasks);
|
pTask->id.idStr, remain, pMeta->numOfStreamTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
// send check point response to upstream task
|
// send check point response to upstream task
|
||||||
|
|
|
@ -300,10 +300,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
qError(
|
qError(
|
||||||
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
||||||
"fill-history task",
|
"fill-history task",
|
||||||
pTask->id.idStr, pTask->streamTaskId.taskId);
|
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
|
||||||
|
|
||||||
// 1. free it and remove fill-history task from disk meta-store
|
// 1. free it and remove fill-history task from disk meta-store
|
||||||
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 2. save to disk
|
// 2. save to disk
|
||||||
|
@ -371,6 +370,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
// 5. clear the link between fill-history task and stream task info
|
// 5. clear the link between fill-history task and stream task info
|
||||||
pStreamTask->historyTaskId.taskId = 0;
|
pStreamTask->historyTaskId.taskId = 0;
|
||||||
|
pStreamTask->historyTaskId.streamId = 0;
|
||||||
|
|
||||||
// 6. save to disk
|
// 6. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
|
@ -36,7 +36,6 @@ static void metaHbToMnode(void* param, void* tmrId);
|
||||||
static void streamMetaClear(SStreamMeta* pMeta);
|
static void streamMetaClear(SStreamMeta* pMeta);
|
||||||
static int32_t streamMetaBegin(SStreamMeta* pMeta);
|
static int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||||
static void streamMetaCloseImpl(void* arg);
|
static void streamMetaCloseImpl(void* arg);
|
||||||
static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
|
@ -205,8 +204,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
taosInitRWLatch(&pMeta->lock);
|
taosInitRWLatch(&pMeta->lock);
|
||||||
taosThreadMutexInit(&pMeta->backendMutex, NULL);
|
taosThreadMutexInit(&pMeta->backendMutex, NULL);
|
||||||
|
|
||||||
pMeta->pauseTaskNum = 0;
|
pMeta->numOfPausedTasks = 0;
|
||||||
|
pMeta->numOfStreamTasks = 0;
|
||||||
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
||||||
stage);
|
stage);
|
||||||
return pMeta;
|
return pMeta;
|
||||||
|
@ -361,10 +360,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
tEncodeStreamTask(&encoder, pTask);
|
tEncodeStreamTask(&encoder, pTask);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
int64_t key[2] = {0};
|
int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
extractStreamTaskKey(key, pTask);
|
if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) {
|
||||||
|
|
||||||
if (tdbTbUpsert(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) {
|
|
||||||
qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
|
qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -373,18 +370,14 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) {
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
|
||||||
pKey[0] = pTask->id.streamId;
|
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||||
pKey[1] = pTask->id.taskId;
|
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) {
|
|
||||||
int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1],
|
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
} else {
|
} else {
|
||||||
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]);
|
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -394,8 +387,8 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) {
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
||||||
*pAdded = false;
|
*pAdded = false;
|
||||||
|
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
@ -417,7 +410,11 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
|
||||||
|
if (pTask->info.fillHistory == 0) {
|
||||||
|
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
|
}
|
||||||
|
|
||||||
*pAdded = true;
|
*pAdded = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -432,10 +429,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
size_t size = taosArrayGetSize(pMeta->pTaskList);
|
size_t size = taosArrayGetSize(pMeta->pTaskList);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
|
|
||||||
SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -451,8 +446,8 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int64_t keys[2] = {streamId, taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
|
@ -495,12 +490,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int64_t keys[2] = {streamId, taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
}
|
}
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
@ -516,7 +511,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
|
@ -535,9 +530,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys));
|
// it is an fill-history task, remove the related stream task's id that points to it
|
||||||
|
if ((*ppTask)->info.fillHistory == 1) {
|
||||||
|
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
|
||||||
|
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
|
||||||
|
if (ppStreamTask != NULL) {
|
||||||
|
(*ppStreamTask)->historyTaskId.taskId = 0;
|
||||||
|
(*ppStreamTask)->historyTaskId.streamId = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
|
||||||
ASSERT(pTask->status.timerActive == 0);
|
ASSERT(pTask->status.timerActive == 0);
|
||||||
|
@ -550,7 +555,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaRemoveTask(pMeta, keys);
|
streamMetaRemoveTask(pMeta, &id);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
||||||
|
@ -639,8 +644,8 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
|
||||||
|
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
|
|
||||||
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
|
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
|
||||||
|
|
||||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
|
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -651,7 +656,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t vLen = 0;
|
int32_t vLen = 0;
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
SArray* pRecycleList = taosArrayInit(4, STREAM_TASK_KEY_LEN);
|
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
|
@ -678,18 +683,17 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
|
||||||
int64_t key[2] = {0};
|
STaskId id = extractStreamTaskKey(pTask);
|
||||||
extractStreamTaskKey(key, pTask);
|
|
||||||
|
|
||||||
taosArrayPush(pRecycleList, key);
|
taosArrayPush(pRecycleList, &id);
|
||||||
int32_t total = taosArrayGetSize(pRecycleList);
|
int32_t total = taosArrayGetSize(pRecycleList);
|
||||||
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
|
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// do duplicate task check.
|
// do duplicate task check.
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
||||||
// In this case, we try not to start fill-history task anymore.
|
// In this case, we try not to start fill-history task anymore.
|
||||||
|
@ -707,21 +711,23 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
|
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
|
||||||
doClear(pKey, pVal, pCur, pRecycleList);
|
doClear(pKey, pVal, pCur, pRecycleList);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->info.fillHistory == 0) {
|
||||||
|
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
|
}
|
||||||
|
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
|
atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
|
|
||||||
|
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
if (tdbTbcClose(pCur) < 0) {
|
if (tdbTbcClose(pCur) < 0) {
|
||||||
|
@ -731,13 +737,14 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
if (taosArrayGetSize(pRecycleList) > 0) {
|
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
||||||
int64_t* pId = taosArrayGet(pRecycleList, i);
|
STaskId* pId = taosArrayGet(pRecycleList, i);
|
||||||
streamMetaRemoveTask(pMeta, pId);
|
streamMetaRemoveTask(pMeta, pId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
qDebug("vgId:%d load %d tasks into meta from disk completed", pMeta->vgId, numOfTasks);
|
qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
|
||||||
|
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||||
taosArrayDestroy(pRecycleList);
|
taosArrayDestroy(pRecycleList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -749,8 +756,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||||
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
||||||
if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
|
@ -765,8 +772,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
|
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
|
||||||
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||||
STaskStatusEntry hb = {0};
|
STaskStatusEntry hb = {0};
|
||||||
if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1;
|
int32_t taskId = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
||||||
|
|
||||||
|
hb.id.taskId = taskId;
|
||||||
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
|
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
|
||||||
|
|
||||||
taosArrayPush(pReq->pTaskStatus, &hb);
|
taosArrayPush(pReq->pTaskStatus, &hb);
|
||||||
|
@ -831,15 +841,14 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
|
||||||
if ((*pTask)->info.fillHistory == 1) {
|
if ((*pTask)->info.fillHistory == 1) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
|
STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus};
|
||||||
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
||||||
|
|
||||||
if (!hasValEpset) {
|
if (!hasValEpset) {
|
||||||
|
|
|
@ -21,8 +21,7 @@
|
||||||
|
|
||||||
typedef struct SStreamTaskRetryInfo {
|
typedef struct SStreamTaskRetryInfo {
|
||||||
SStreamMeta* pMeta;
|
SStreamMeta* pMeta;
|
||||||
int32_t taskId;
|
STaskId id;
|
||||||
int64_t streamId;
|
|
||||||
} SStreamTaskRetryInfo;
|
} SStreamTaskRetryInfo;
|
||||||
|
|
||||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||||
|
@ -74,6 +73,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
case TASK_STATUS__CK: return "check-point";
|
case TASK_STATUS__CK: return "check-point";
|
||||||
case TASK_STATUS__DROPPING: return "dropping";
|
case TASK_STATUS__DROPPING: return "dropping";
|
||||||
case TASK_STATUS__STOP: return "stop";
|
case TASK_STATUS__STOP: return "stop";
|
||||||
|
case TASK_STATUS__UNINIT: return "uninitialized";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,6 +245,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
ASSERT(pTask->historyTaskId.taskId == 0);
|
ASSERT(pTask->historyTaskId.taskId == 0);
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||||
|
streamTaskEnablePause(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -520,12 +521,10 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SStreamTaskRetryInfo* pInfo = param;
|
SStreamTaskRetryInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
|
||||||
qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
|
qDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive >= 1);
|
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||||
|
|
||||||
|
@ -541,7 +540,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
ASSERT(pTask->status.timerActive >= 1);
|
ASSERT(pTask->status.timerActive >= 1);
|
||||||
|
|
||||||
|
@ -552,7 +551,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
qWarn(
|
qWarn(
|
||||||
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
|
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
|
||||||
"destroyed, or should stop",
|
"destroyed, or should stop",
|
||||||
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
|
pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId);
|
||||||
|
|
||||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -568,7 +567,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
|
qError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
@ -587,17 +586,15 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||||
pTask->historyTaskId.streamId, hTaskId);
|
pTask->historyTaskId.streamId, hTaskId);
|
||||||
|
|
||||||
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
|
|
||||||
|
|
||||||
// Set the execute conditions, including the query time window and the version range
|
// Set the execute conditions, including the query time window and the version range
|
||||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId));
|
||||||
if (pHTask == NULL) {
|
if (pHTask == NULL) {
|
||||||
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
||||||
pMeta->vgId, hTaskId);
|
pMeta->vgId, hTaskId);
|
||||||
|
|
||||||
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
|
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
|
||||||
pInfo->taskId = pTask->id.taskId;
|
pInfo->id.taskId = pTask->id.taskId;
|
||||||
pInfo->streamId = pTask->id.streamId;
|
pInfo->id.streamId = pTask->id.streamId;
|
||||||
pInfo->pMeta = pTask->pMeta;
|
pInfo->pMeta = pTask->pMeta;
|
||||||
|
|
||||||
if (pTask->launchTaskTimer == NULL) {
|
if (pTask->launchTaskTimer == NULL) {
|
||||||
|
@ -823,7 +820,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -857,7 +854,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
@ -877,10 +874,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
if (status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__PAUSE) {
|
||||||
pTask->status.taskStatus = pTask->status.keepTaskStatus;
|
pTask->status.taskStatus = pTask->status.keepTaskStatus;
|
||||||
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
|
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
|
||||||
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
|
qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
|
qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
|
||||||
} else {
|
} else {
|
||||||
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
|
|
|
@ -97,9 +97,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
|
int32_t taskId = pTask->historyTaskId.taskId;
|
||||||
|
if (tEncodeI32(pEncoder, taskId)) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
|
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1;
|
taskId = pTask->streamTaskId.taskId;
|
||||||
|
if (tEncodeI32(pEncoder, taskId)) return -1;
|
||||||
|
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
||||||
|
@ -141,6 +144,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
|
int32_t taskId = 0;
|
||||||
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
|
||||||
if (pTask->ver != SSTREAM_TASK_VER) return -1;
|
if (pTask->ver != SSTREAM_TASK_VER) return -1;
|
||||||
|
@ -165,9 +170,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
|
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||||
|
pTask->historyTaskId.taskId = taskId;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1;
|
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||||
|
pTask->streamTaskId.taskId = taskId;
|
||||||
|
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
||||||
|
@ -251,15 +259,19 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) {
|
|
||||||
|
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
|
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
|
||||||
if (ver != SSTREAM_TASK_VER) return -1;
|
if (ver != SSTREAM_TASK_VER) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1;
|
|
||||||
|
|
||||||
|
int32_t taskId = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
||||||
|
|
||||||
|
pTaskId->taskId = taskId;
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -478,8 +490,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||||
if (pInfo->nodeId == nodeId) {
|
if (pInfo->nodeId == nodeId) {
|
||||||
epsetAssign(&pInfo->epSet, pEpSet);
|
epsetAssign(&pInfo->epSet, pEpSet);
|
||||||
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId,
|
qDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
||||||
pInfo->taskId, buf);
|
pInfo->taskId, nodeId, buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -509,7 +521,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
|
|
||||||
if (pVgInfo->vgId == nodeId) {
|
if (pVgInfo->vgId == nodeId) {
|
||||||
epsetAssign(&pVgInfo->epSet, pEpSet);
|
epsetAssign(&pVgInfo->epSet, pEpSet);
|
||||||
qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);
|
qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
||||||
|
pVgInfo->taskId, nodeId, buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -517,7 +530,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
|
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
|
||||||
if (pDispatcher->nodeId == nodeId) {
|
if (pDispatcher->nodeId == nodeId) {
|
||||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||||
qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpSet:%s", pTask->id.taskId, nodeId, buf);
|
qDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
||||||
|
pDispatcher->taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
@ -567,17 +581,19 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
|
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||||
STaskExecStatisInfo* p = &pTask->taskExecInfo;
|
STaskExecStatisInfo* p = &pTask->taskExecInfo;
|
||||||
qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr,
|
|
||||||
p->updateCount + 1, p->latestUpdateTs);
|
|
||||||
|
|
||||||
p->updateCount += 1;
|
int32_t numOfNodes = taosArrayGetSize(pNodeList);
|
||||||
|
int64_t prevTs = p->latestUpdateTs;
|
||||||
|
|
||||||
p->latestUpdateTs = taosGetTimestampMs();
|
p->latestUpdateTs = taosGetTimestampMs();
|
||||||
|
p->updateCount += 1;
|
||||||
|
qDebug("s-task:%s update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.idStr,
|
||||||
|
numOfNodes, p->updateCount, prevTs);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||||
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||||
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
|
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -649,3 +665,8 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT
|
||||||
qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
|
qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STaskId extractStreamTaskKey(const SStreamTask* pTask) {
|
||||||
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
|
return id;
|
||||||
|
}
|
Loading…
Reference in New Issue