refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-22 15:43:50 +08:00
parent e6b49b45d9
commit af80d94a88
8 changed files with 29 additions and 26 deletions

View File

@ -97,6 +97,11 @@ enum {
STREAM_QUEUE__PROCESSING, STREAM_QUEUE__PROCESSING,
}; };
enum {
STREAM_META_WILL_STOP = 1,
STREAM_META_OK_TO_STOP = 2,
};
typedef struct { typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
@ -389,8 +394,8 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
tmr_h hbTmr; tmr_h hbTmr;
// SMgmtInfo mgmtInfo;
int32_t killed;
int32_t closedTask; int32_t closedTask;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;

View File

@ -738,7 +738,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; // if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -2088,7 +2088,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo return code;
} }
} }
@ -2160,6 +2160,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
// this function runs by only one thread, so it is not multi-thread safe // this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) { if (old != 0) {
mDebug("still in checking node change"); mDebug("still in checking node change");
@ -2189,16 +2190,18 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
mndProcessVgroupChange(pMnode, &changeInfo); code = mndProcessVgroupChange(pMnode, &changeInfo);
} }
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
// keep the new vnode snapshot // keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS) {
taosArrayDestroy(execNodeList.pNodeEntryList); taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot; execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts; execNodeList.ts = ts;
}
mDebug("end to do stream task node change checking"); mDebug("end to do stream task node change checking");
atomic_store_32(&mndNodeCheckSentinel, 0); atomic_store_32(&mndNodeCheckSentinel, 0);

View File

@ -190,7 +190,6 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return inTimer; return inTimer;
} }
@ -219,6 +218,12 @@ void tqNotifyClose(STQ* pTq) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
pMeta->killed = STREAM_META_WILL_STOP;
while(pMeta->killed != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
tqDebug("vgId:%d start to check all tasks", vgId); tqDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -1432,7 +1437,6 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
return code; return code;
} }
// NOTE: the rsp msg should be kept in WAL file.
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -1461,20 +1465,6 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg", "completed msg",
pTask->id.idStr, req.downstreamId); pTask->id.idStr, req.downstreamId);
// the scan-history finish status should be recorded in the WAL files. So the transfer of the task status from
// scan-history
// to normal should be executed by write thread of each vnode.
// void* buf = NULL;
// int32_t tlen = 0;
// // encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
//
// SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
// if (tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
// tqError("failed to put into write-queue since %s", terrstr());
// }
streamProcessScanHistoryFinishRsp(pTask); streamProcessScanHistoryFinishRsp(pTask);
} }

View File

@ -691,7 +691,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
} }
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
} }

View File

@ -362,7 +362,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskEnablePause(pStreamTask); streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pStreamTask->inputQueue->queue)) { if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
;
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0; pDelBlock->info.rows = 0;
pDelBlock->info.version = 0; pDelBlock->info.version = 0;

View File

@ -607,6 +607,12 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamMeta* pMeta = param; SStreamMeta* pMeta = param;
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
if (pMeta->killed == STREAM_META_WILL_STOP) {
pMeta->killed = STREAM_META_OK_TO_STOP;
qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
return;
}
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);

View File

@ -477,7 +477,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
pTask->status.taskStatus = TASK_STATUS__STOP; pTask->status.taskStatus = TASK_STATUS__STOP;
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
while (pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE) { while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) {
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
taosMsleep(100); taosMsleep(100);
} }