From 5c128e2292c2c76c7f377797bcb53f83c0dcbd84 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Mar 2023 09:27:48 +0800 Subject: [PATCH 1/2] refactor: do some internal refactor. --- include/util/tarray.h | 16 ----- source/dnode/mnode/impl/src/mndConsumer.c | 9 ++- source/dnode/vnode/src/tq/tq.c | 13 +++- source/dnode/vnode/src/tq/tqExec.c | 19 +++--- source/libs/executor/src/executorimpl.c | 2 +- source/util/src/tarray.c | 75 +---------------------- 6 files changed, 34 insertions(+), 100 deletions(-) diff --git a/include/util/tarray.h b/include/util/tarray.h index a8510e4bc8..278f9f6bab 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -69,14 +69,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles); */ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); -/** - * - * @param pArray - * @param comparFn - * @param fp - */ -void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); - /** * add all element from the source array list into the destination * @param pArray @@ -252,14 +244,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode); void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz); -/** - * swap array - * @param a - * @param b - * @return - */ -void taosArraySwap(SArray* a, SArray* b); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2f560910d4..616f69cd5b 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -601,6 +601,13 @@ static void* topicNameDup(void* p){ return taosStrdup((char*) p); } +static void freeItem(void* param) { + void* pItem = *(void**)param; + if (pItem != NULL) { + taosMemoryFree(pItem); + } +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -616,7 +623,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t code = -1; SArray *pTopicList = subscribe.topicNames; taosArraySort(pTopicList, taosArrayCompareString); - taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree); + taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); int32_t newTopicNum = taosArrayGetSize(pTopicList); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index de2732fcb5..5210a8cc66 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -885,6 +885,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { +// qAsyncKillTask(pTaskInfo); + } + taosWLockLatch(&pTq->lock); atomic_store_32(&pHandle->epoch, -1); @@ -895,7 +901,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pHandle->execHandle.task); + qStreamCloseTsdbReader(pTaskInfo); + } + + // reset the error code. + if (pHandle->execHandle.task != NULL) { + } taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a62101eb47..ce86a6757e 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -65,16 +65,17 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; + int32_t vgId = TD_VID(pTq->pVnode); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); + tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return"); + tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); pRsp->rspOffset = *pOffset; return 0; } @@ -86,13 +87,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(), + pHandle->consumerId); return -1; } - tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock); + tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock); // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { @@ -115,15 +117,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } if (pRsp->rspOffset.type == 0) { - tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, - pRsp->rspOffset.uid, pRsp->rspOffset.version); + tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type, + pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version); return -1; } if (pRsp->withTbName || pRsp->withSchema) { - tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema); + tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema); return -1; } + return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 67174c3267..24a26d575a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -633,7 +633,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } } -bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; } +bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code);} void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 7467fa2948..6c1d3e17bb 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -140,7 +140,7 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) taosArraySet(pArray, pos + 1, p2); memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize); - pos += 1; + pos += 1; } else { pos += 1; } @@ -157,45 +157,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) pArray->size = pos + 1; } -void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - size_t size = pArray->size; - if (size <= 1) { - return; - } - - int32_t pos = 0; - for (int32_t i = 1; i < size; ++i) { - char* p1 = taosArrayGet(pArray, pos); - char* p2 = taosArrayGet(pArray, i); - - if (comparFn(p1, p2) == 0) { - // do nothing - } else { - if (pos + 1 != i) { - void* p = taosArrayGetP(pArray, pos + 1); - if (fp != NULL) { - fp(p); - } - - taosArraySet(pArray, pos + 1, p2); - memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize); - pos += 1; - } else { - pos += 1; - } - } - } - - if (fp != NULL) { - for (int32_t i = pos + 1; i < pArray->size; ++i) { - void* p = taosArrayGetP(pArray, i); - fp(p); - } - } - - pArray->size = pos + 1; -} - void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { if (pInput) { return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); @@ -392,20 +353,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } -void taosArrayClearP(SArray* pArray, FDelete fp) { - if (pArray == NULL) return; - if (fp == NULL) { - pArray->size = 0; - return; - } - - for (int32_t i = 0; i < pArray->size; ++i) { - fp(*(void**)TARRAY_GET_ELEM(pArray, i)); - } - - pArray->size = 0; -} - void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); @@ -495,6 +442,7 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void if (pArray->size <= 1) { return; } + for (int32_t i = 1; i <= pArray->size - 1; ++i) { for (int32_t j = i; j > 0; --j) { if (fn(taosArrayGetP(pArray, j), taosArrayGetP(pArray, j - 1), param) == -1) { @@ -507,7 +455,6 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void } } } - return; } int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) { @@ -539,21 +486,3 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); } -void taosArraySwap(SArray* a, SArray* b) { - if (a == NULL || b == NULL) return; - size_t t = a->size; - a->size = b->size; - b->size = t; - - uint32_t cap = a->capacity; - a->capacity = b->capacity; - b->capacity = cap; - - uint32_t elem = a->elemSize; - a->elemSize = b->elemSize; - b->elemSize = elem; - - void* data = a->pData; - a->pData = b->pData; - b->pData = data; -} From c158087d66c5d4e7c19cfebc5504ad2a8d8e60a3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Mar 2023 09:53:49 +0800 Subject: [PATCH 2/2] fix(tmq): kill the ongoing tsdb scans while transferring the ownership of vnode to other consumers. --- include/libs/executor/executor.h | 10 +--------- source/dnode/vnode/src/tq/tq.c | 7 +------ source/libs/executor/src/executor.c | 17 +++++++++++++++++ 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e76422ee34..19a6407b77 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -149,7 +149,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ - int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); @@ -162,6 +161,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @return */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); bool qTaskIsExecuting(qTaskInfo_t qinfo); @@ -171,14 +171,6 @@ bool qTaskIsExecuting(qTaskInfo_t qinfo); */ void qDestroyTask(qTaskInfo_t tinfo); -/** - * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. - * - * @param iter the table iterator to traverse all tables belongs to a super table, or an invert index - * @return - */ -int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); - void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5210a8cc66..ccd8b885a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -888,7 +888,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if (pTaskInfo != NULL) { -// qAsyncKillTask(pTaskInfo); + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); } taosWLockLatch(&pTq->lock); @@ -904,11 +904,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg qStreamCloseTsdbReader(pTaskInfo); } - // reset the error code. - if (pHandle->execHandle.task != NULL) { - - } - taosWUnLockLatch(&pTq->lock); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { taosMemoryFree(req.qmsg); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 86fcd319c5..1ea2101aff 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -749,6 +749,23 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (pTaskInfo == NULL) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo, rspCode); + + while(qTaskIsExecuting(pTaskInfo)) { + taosMsleep(10); + } + + pTaskInfo->code = rspCode; + return TSDB_CODE_SUCCESS; +} + bool qTaskIsExecuting(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (NULL == pTaskInfo) {