From 15544a220759fa8f8f9709d3f4b8617878abb5da Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 11:14:31 +0800 Subject: [PATCH] enh: rsma fetch --- include/common/tdatablock.h | 3 +- source/common/src/tdatablock.c | 16 +-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 14 +-- source/dnode/vnode/src/sma/smaRollup.c | 108 ++++++++++---------- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/executor/src/executor.c | 3 +- source/util/src/terror.c | 4 +- 8 files changed, 75 insertions(+), 79 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5b39bc854e..c77cb16e86 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,9 +246,10 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); + char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dba30bb876..bc05f7a1c3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1870,20 +1870,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @brief TODO: Assume that the final generated result it less than 3M * * @param pReq - * @param pDataBlock + * @param pDataBlocks * @param vgId * @param suid - * + * */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { + int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); - int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { - const SDataBlockInfo* pBlkInfo = &pDataBlock->info; + SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); + int32_t numOfCols = taosArrayGetSize(pDataBlocks); + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); bufSize += sizeof(SSubmitBlk); } @@ -1900,6 +1900,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; @@ -1997,7 +1998,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB } offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation } - tdSRowEnd(&rb); dataLen += TD_ROW_LEN(rb.pBuf); #ifdef TD_DEBUG_PRINT_ROW tdSRowPrint(rb.pBuf, pTSchema, __func__); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d13daffa08..7c6807ab87 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1a226abe5c..0a42f06081 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dGError("msg:%p, failed to process since %s", pMsg, terrstr()); + dGError("msg:%p, failed to process since %s", pMsg, terrstr(code)); } vmSendRsp(pMsg, code); } @@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_GRANT_EXPIRED; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); @@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_VND_NO_DISKSPACE; code = terrno; - dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_VND_NO_WRITE_AUTH; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6b513f0242..801bc9c2e4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } + pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -614,9 +616,13 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); if (code < 0) { - smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, - pItem->level, terrstr(code)); - goto _err; + if (code == TSDB_CODE_QRY_IN_EXEC) { + break; + } else { + smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, + pItem->level, terrstr(code)); + goto _err; + } } if (taosArrayGetSize(pResList) == 0) { @@ -630,37 +636,34 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm break; } - for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { - SSDataBlock *output = taosArrayGetP(pResList, i); - #if 1 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); -// blockDebugShowDataBlocks(output, flag); -// taosArrayDestroy(pResult); + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, pItem->level); + blockDebugShowDataBlocks(pResList, flag); #endif - STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; - // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; - } - - if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; - } - - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, - SMA_VID(pSma), suid, pItem->level, output->info.version); - - taosMemoryFreeClear(pReq); + // TODO: the schema update should be handled later(TD-17965) + if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) { + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), + suid, pItem->level, terrstr()); + goto _err; } + + SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0); + if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) { + taosMemoryFreeClear(pReq); + smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64 + " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } + + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma), + suid, pItem->level); + + taosMemoryFreeClear(pReq); } taosArrayDestroy(pResList); @@ -692,15 +695,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } else { - ASSERT(0); } return TSDB_CODE_SUCCESS; @@ -746,7 +746,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; // lock @@ -793,13 +792,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { - smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); - return TSDB_CODE_SUCCESS; - } - - if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) { - tdReleaseRSmaInfo(pSma, pRSmaInfo); - smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); + smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } @@ -1331,14 +1324,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pRSmaInfo->refId); return; } SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pRSmaInfo->refId); + smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", + smaMgmt.rsetId, pRSmaInfo->refId); return; } @@ -1350,8 +1345,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 - " refId:%d", + smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 + ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); @@ -1366,30 +1361,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); } break; case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INIT: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", + smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } _end: + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } @@ -1402,7 +1398,7 @@ _end: * @return int32_t */ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; + SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; SEncoder encoder = {0}; @@ -1431,7 +1427,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { .contLen = contLen, }; - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), pInfo->suid, level, terrstr()); goto _err; @@ -1462,7 +1458,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - return -1; + goto _err; } pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); @@ -1479,7 +1475,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { terrno = TSDB_CODE_RSMA_EMPTY_INFO; } smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), - req.suid, req.level, terrstr()); + req.suid, req.level, terrstr()); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f13bde0c1..2c0c7f7c88 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_FETCH_RSMA: + return smaProcessFetch(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1fc0f5ff51..fe278881be 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -507,7 +507,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *pRes = NULL; int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); + qDebug("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; } @@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { cleanUpUdfs(); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); atomic_store_64(&pTaskInfo->owner, 0); - return pTaskInfo->code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a3d0620ed..c2a467a2cb 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -119,7 +119,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, "Disconnected from ser TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "TSC:Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") @@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg") // mnode-db -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "MND:Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name")