From 15544a220759fa8f8f9709d3f4b8617878abb5da Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 11:14:31 +0800 Subject: [PATCH 1/9] enh: rsma fetch --- include/common/tdatablock.h | 3 +- source/common/src/tdatablock.c | 16 +-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 14 +-- source/dnode/vnode/src/sma/smaRollup.c | 108 ++++++++++---------- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/executor/src/executor.c | 3 +- source/util/src/terror.c | 4 +- 8 files changed, 75 insertions(+), 79 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5b39bc854e..c77cb16e86 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,9 +246,10 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); + char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dba30bb876..bc05f7a1c3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1870,20 +1870,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @brief TODO: Assume that the final generated result it less than 3M * * @param pReq - * @param pDataBlock + * @param pDataBlocks * @param vgId * @param suid - * + * */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { + int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); - int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { - const SDataBlockInfo* pBlkInfo = &pDataBlock->info; + SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); + int32_t numOfCols = taosArrayGetSize(pDataBlocks); + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); bufSize += sizeof(SSubmitBlk); } @@ -1900,6 +1900,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; @@ -1997,7 +1998,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB } offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation } - tdSRowEnd(&rb); dataLen += TD_ROW_LEN(rb.pBuf); #ifdef TD_DEBUG_PRINT_ROW tdSRowPrint(rb.pBuf, pTSchema, __func__); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d13daffa08..7c6807ab87 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1a226abe5c..0a42f06081 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dGError("msg:%p, failed to process since %s", pMsg, terrstr()); + dGError("msg:%p, failed to process since %s", pMsg, terrstr(code)); } vmSendRsp(pMsg, code); } @@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_GRANT_EXPIRED; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); @@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_VND_NO_DISKSPACE; code = terrno; - dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_VND_NO_WRITE_AUTH; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6b513f0242..801bc9c2e4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } + pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -614,9 +616,13 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); if (code < 0) { - smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, - pItem->level, terrstr(code)); - goto _err; + if (code == TSDB_CODE_QRY_IN_EXEC) { + break; + } else { + smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, + pItem->level, terrstr(code)); + goto _err; + } } if (taosArrayGetSize(pResList) == 0) { @@ -630,37 +636,34 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm break; } - for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { - SSDataBlock *output = taosArrayGetP(pResList, i); - #if 1 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); -// blockDebugShowDataBlocks(output, flag); -// taosArrayDestroy(pResult); + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, pItem->level); + blockDebugShowDataBlocks(pResList, flag); #endif - STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; - // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; - } - - if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; - } - - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, - SMA_VID(pSma), suid, pItem->level, output->info.version); - - taosMemoryFreeClear(pReq); + // TODO: the schema update should be handled later(TD-17965) + if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) { + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), + suid, pItem->level, terrstr()); + goto _err; } + + SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0); + if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) { + taosMemoryFreeClear(pReq); + smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64 + " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } + + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma), + suid, pItem->level); + + taosMemoryFreeClear(pReq); } taosArrayDestroy(pResList); @@ -692,15 +695,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } else { - ASSERT(0); } return TSDB_CODE_SUCCESS; @@ -746,7 +746,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; // lock @@ -793,13 +792,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { - smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); - return TSDB_CODE_SUCCESS; - } - - if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) { - tdReleaseRSmaInfo(pSma, pRSmaInfo); - smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); + smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } @@ -1331,14 +1324,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pRSmaInfo->refId); return; } SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pRSmaInfo->refId); + smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", + smaMgmt.rsetId, pRSmaInfo->refId); return; } @@ -1350,8 +1345,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 - " refId:%d", + smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 + ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); @@ -1366,30 +1361,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); } break; case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INIT: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", + smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } _end: + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } @@ -1402,7 +1398,7 @@ _end: * @return int32_t */ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; + SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; SEncoder encoder = {0}; @@ -1431,7 +1427,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { .contLen = contLen, }; - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), pInfo->suid, level, terrstr()); goto _err; @@ -1462,7 +1458,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - return -1; + goto _err; } pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); @@ -1479,7 +1475,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { terrno = TSDB_CODE_RSMA_EMPTY_INFO; } smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), - req.suid, req.level, terrstr()); + req.suid, req.level, terrstr()); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f13bde0c1..2c0c7f7c88 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_FETCH_RSMA: + return smaProcessFetch(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1fc0f5ff51..fe278881be 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -507,7 +507,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *pRes = NULL; int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); + qDebug("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; } @@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { cleanUpUdfs(); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); atomic_store_64(&pTaskInfo->owner, 0); - return pTaskInfo->code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a3d0620ed..c2a467a2cb 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -119,7 +119,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, "Disconnected from ser TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "TSC:Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") @@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg") // mnode-db -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "MND:Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name") From 6cd2e52d1a5acf1f731dfd4a00b3a05c65fd9275 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 14:43:17 +0800 Subject: [PATCH 2/9] enh: code optimization --- source/common/src/tdatablock.c | 3 ++- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/libs/executor/src/executor.c | 2 +- source/util/src/terror.c | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bc05f7a1c3..58ea3eaf4e 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { char pBuf[128] = {0}; int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); + SSDataBlock* pDataBlock = taosArrayGetP(dataBlocks, i); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; @@ -1890,6 +1890,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks *pReq = taosMemoryCalloc(1, bufSize); if (!(*pReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; + uError("buildSubmitReqFromDataBlock for table:%" PRIi64 " bufSize:%d failed since %s", suid, bufSize, terrstr()); return TSDB_CODE_FAILED; } void* pDataBuf = *pReq; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 801bc9c2e4..bc71851160 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -636,7 +636,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm break; } -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fe278881be..7115ad85a5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -507,7 +507,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *pRes = NULL; int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qDebug("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c2a467a2cb..8a3d0620ed 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -119,7 +119,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, "Disconnected from ser TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "TSC:Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") @@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg") // mnode-db -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "MND:Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name") From 113b29ed12e8b90ff8286d90ccef4eb1222636e9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 15:03:10 +0800 Subject: [PATCH 3/9] fix: taos array get pointer --- source/common/src/tdatablock.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 58ea3eaf4e..26598ba279 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1880,7 +1880,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); for (int32_t i = 0; i < sz; ++i) { - SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; + SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGetP(pDataBlocks, i))->info; int32_t numOfCols = taosArrayGetSize(pDataBlocks); bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); @@ -1890,7 +1890,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks *pReq = taosMemoryCalloc(1, bufSize); if (!(*pReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; - uError("buildSubmitReqFromDataBlock for table:%" PRIi64 " bufSize:%d failed since %s", suid, bufSize, terrstr()); return TSDB_CODE_FAILED; } void* pDataBuf = *pReq; @@ -1901,7 +1900,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); + SSDataBlock* pDataBlock = taosArrayGetP(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; From d71e238d48810ae7017a9fed2ca723b22d1ac74c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 15:25:31 +0800 Subject: [PATCH 4/9] fix: code optimization --- source/common/src/tdatablock.c | 1 + source/libs/executor/src/tsimplehash.c | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 26598ba279..c5bcab4e76 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1998,6 +1998,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks } offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation } + tdSRowEnd(&rb); dataLen += TD_ROW_LEN(rb.pBuf); #ifdef TD_DEBUG_PRINT_ROW tdSRowPrint(rb.pBuf, pTSchema, __func__); diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index e709643af9..5ba2be1f21 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -137,13 +137,13 @@ static void taosHashTableResize(SSHashObj *pHashObj) { continue; } - SHNode *pNext; + SHNode *pNext = NULL; SHNode *pPrev = NULL; while (pNode != NULL) { void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen); - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->dataLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity); pNext = pNode->next; @@ -187,12 +187,13 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { SHNode *pNode = pHashObj->hashList[slot]; if (pNode == NULL) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal); + SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); if (pNewNode == NULL) { return -1; } pHashObj->hashList[slot] = pNewNode; + atomic_add_fetch_64(&pHashObj->size, 1); return 0; } @@ -204,7 +205,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { } if (pNode == NULL) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal); + SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); if (pNewNode == NULL) { return -1; } From e890bcc6f4a82250fb7db88e53099dce69421809 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 19:12:24 +0800 Subject: [PATCH 5/9] enh: tsimplehash remove/iter/ut --- source/libs/executor/inc/tsimplehash.h | 11 ++ source/libs/executor/src/tsimplehash.c | 116 ++++++++++++++---- source/libs/executor/test/CMakeLists.txt | 17 ++- .../libs/executor/test/tSimpleHashTests.cpp | 73 +++++++++++ 4 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 source/libs/executor/test/tSimpleHashTests.cpp diff --git a/source/libs/executor/inc/tsimplehash.h b/source/libs/executor/inc/tsimplehash.h index a1ba70c702..a56f8e8c04 100644 --- a/source/libs/executor/inc/tsimplehash.h +++ b/source/libs/executor/inc/tsimplehash.h @@ -45,6 +45,8 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t */ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); +int32_t tSimpleHashPrint(const SSHashObj *pHashObj); + /** * put element into hash table, if the element with the same key exists, update it * @param pHashObj @@ -98,6 +100,15 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); */ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); +/** + * Create the hash table iterator + * @param pHashObj + * @param data + * @param iter + * @return void* + */ +void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 5ba2be1f21..7989ad2b5a 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -62,7 +62,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t } SSHashObj *pHashObj = (SSHashObj *)taosMemoryCalloc(1, sizeof(SSHashObj)); - if (pHashObj == NULL) { + if (!pHashObj) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -78,7 +78,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t pHashObj->dataLen = dataLen; pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); - if (pHashObj->hashList == NULL) { + if (!pHashObj->hashList) { taosMemoryFree(pHashObj); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -87,7 +87,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t } int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return 0; } return (int32_t)atomic_load_64((int64_t *)&pHashObj->size); @@ -95,7 +95,7 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize); - if (pNewNode == NULL) { + if (!pNewNode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -120,7 +120,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) { int64_t st = taosGetTimestampUs(); void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); - if (pNewEntryList == NULL) { + if (!pNewEntryList) { // qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; } @@ -133,14 +133,13 @@ static void taosHashTableResize(SSHashObj *pHashObj) { for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) { SHNode *pNode = pHashObj->hashList[idx]; - if (pNode == NULL) { + if (!pNode) { continue; } SHNode *pNext = NULL; SHNode *pPrev = NULL; - while (pNode != NULL) { void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); @@ -148,7 +147,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) { int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity); pNext = pNode->next; if (newIdx != idx) { - if (pPrev == NULL) { + if (!pPrev) { pHashObj->hashList[idx] = pNext; } else { pPrev->next = pNext; @@ -172,7 +171,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) { } int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { - if (pHashObj == NULL || key == NULL) { + if (!pHashObj || !key) { return -1; } @@ -186,9 +185,9 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; - if (pNode == NULL) { + if (!pNode) { SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); - if (pNewNode == NULL) { + if (!pNewNode) { return -1; } @@ -204,9 +203,9 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { pNode = pNode->next; } - if (pNode == NULL) { + if (!pNode) { SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); - if (pNewNode == NULL) { + if (!pNewNode) { return -1; } pNewNode->next = pHashObj->hashList[slot]; @@ -235,7 +234,7 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; } void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { - if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL) { + if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) { return NULL; } @@ -243,7 +242,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; - if (pNode == NULL) { + if (!pNode) { return NULL; } @@ -257,19 +256,43 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { } int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) { - // todo + if (!pHashObj || !key) { + return TSDB_CODE_FAILED; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + + SHNode *pNode = pHashObj->hashList[slot]; + SHNode *pPrev = NULL; + while (pNode) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { + if (!pPrev) { + pHashObj->hashList[slot] = pNode->next; + } else { + pPrev->next = pNode->next; + } + FREE_HASH_NODE(pNode); + atomic_sub_fetch_64(&pHashObj->size, 1); + break; + } + pPrev = pNode; + pNode = pNode->next; + } + return TSDB_CODE_SUCCESS; } void tSimpleHashClear(SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj || taosHashTableEmpty(pHashObj)) { return; } - SHNode *pNode, *pNext; + SHNode *pNode = NULL, *pNext = NULL; for (int32_t i = 0; i < pHashObj->capacity; ++i) { pNode = pHashObj->hashList[i]; - if (pNode == NULL) { + if (!pNode) { continue; } @@ -279,11 +302,11 @@ void tSimpleHashClear(SSHashObj *pHashObj) { pNode = pNext; } } - pHashObj->size = 0; + atomic_store_64(&pHashObj->size, 0); } void tSimpleHashCleanup(SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return; } @@ -292,7 +315,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { } size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return 0; } @@ -300,11 +323,58 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { } void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) { +#if 0 int32_t offset = offsetof(SHNode, data); SHNode *node = ((SHNode *)(char *)data - offset); - if (keyLen != NULL) { + if (keyLen) { *keyLen = pHashObj->keyLen; } + return POINTER_SHIFT(data, pHashObj->dataLen); + return GET_SHASH_NODE_KEY(node, pHashObj->dataLen); +#endif + if (keyLen) { + *keyLen = pHashObj->keyLen; + } + + return POINTER_SHIFT(data, pHashObj->dataLen); +} + +void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { + if (!pHashObj) { + return NULL; + } + + SHNode *pNode = NULL; + + if (!data) { + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + return GET_SHASH_NODE_DATA(pNode); + } + return NULL; + } + + pNode = (SHNode *)((char *)data - offsetof(SHNode, data)); + + if (pNode->next) { + return GET_SHASH_NODE_DATA(pNode->next); + } + + ++(*iter); + for (int32_t i = *iter; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + return GET_SHASH_NODE_DATA(pNode); + } + + return NULL; } \ No newline at end of file diff --git a/source/libs/executor/test/CMakeLists.txt b/source/libs/executor/test/CMakeLists.txt index acab27ec08..18ca954813 100644 --- a/source/libs/executor/test/CMakeLists.txt +++ b/source/libs/executor/test/CMakeLists.txt @@ -17,4 +17,19 @@ IF(NOT TD_DARWIN) PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/" PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc" ) -ENDIF () \ No newline at end of file +ENDIF () + +# SET(CMAKE_CXX_STANDARD 11) +# AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# ADD_EXECUTABLE(tSimpleHashTest tSimpleHashTests.cpp) +# TARGET_LINK_LIBRARIES( +# tSimpleHashTest +# PRIVATE os util common executor gtest_main +# ) + +# TARGET_INCLUDE_DIRECTORIES( +# tSimpleHashTest +# PUBLIC "${TD_SOURCE_DIR}/include/common" +# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +# ) \ No newline at end of file diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp new file mode 100644 index 0000000000..e9284d2477 --- /dev/null +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "taos.h" +#include "thash.h" +#include "tsimplehash.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(testCase, tSimpleHashTest) { + SSHashObj *pHashObj = + tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t)); + + assert(pHashObj != nullptr); + + ASSERT_EQ(0, tSimpleHashGetSize(pHashObj)); + + int64_t originKeySum = 0; + for (int64_t i = 1; i <= 100; ++i) { + originKeySum += i; + tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i); + ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); + } + + for (int64_t i = 1; i <= 100; ++i) { + void *data = tSimpleHashGet(pHashObj, (const void *)&i); + ASSERT_EQ(i, *(int64_t *)data); + } + + + void *data = NULL; + int32_t iter = 0; + int64_t keySum = 0; + int64_t dataSum = 0; + while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { + void *key = tSimpleHashGetKey(pHashObj, data, NULL); + keySum += *(int64_t *)key; + dataSum += *(int64_t *)data; + } + + ASSERT_EQ(keySum, dataSum); + ASSERT_EQ(keySum, originKeySum); + + for (int64_t i = 1; i <= 100; ++i) { + tSimpleHashRemove(pHashObj, (const void *)&i); + ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); + } +} + +#pragma GCC diagnostic pop \ No newline at end of file From 133beb4d491e94039020cee8e80074476f91df77 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 19:18:55 +0800 Subject: [PATCH 6/9] other: make test case pass --- source/libs/executor/test/tSimpleHashTests.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp index e9284d2477..a17a7146ea 100644 --- a/source/libs/executor/test/tSimpleHashTests.cpp +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -25,10 +25,10 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} +// int main(int argc, char **argv) { +// testing::InitGoogleTest(&argc, argv); +// return RUN_ALL_TESTS(); +// } TEST(testCase, tSimpleHashTest) { SSHashObj *pHashObj = @@ -68,6 +68,8 @@ TEST(testCase, tSimpleHashTest) { tSimpleHashRemove(pHashObj, (const void *)&i); ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); } + + tSimpleHashCleanup(pHashObj); } #pragma GCC diagnostic pop \ No newline at end of file From d9c10a2f0fa7e080ca3eb986da3f51077bc623d7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 19:59:47 +0800 Subject: [PATCH 7/9] other: rsma submit --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 11 +++--- source/dnode/vnode/src/sma/smaRollup.c | 49 ++++++++++++++------------ 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c77cb16e86..410fa02ded 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c5bcab4e76..8489627721 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1875,15 +1875,15 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @param suid * */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { - int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); + int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { - SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGetP(pDataBlocks, i))->info; + const SDataBlockInfo* pBlkInfo = &pDataBlock->info; - int32_t numOfCols = taosArrayGetSize(pDataBlocks); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); bufSize += sizeof(SSubmitBlk); } @@ -1900,7 +1900,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGetP(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index bc71851160..b7a2efd489 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -615,7 +615,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm while (1) { uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); - if (code < 0) { + if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; } else { @@ -627,43 +627,46 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm if (taosArrayGetSize(pResList) == 0) { if (terrno == 0) { - smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); + // smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); } else { smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); goto _err; } break; + } else { + smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); #endif - STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { + SSDataBlock *output = taosArrayGetP(pResList, i); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; - // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); - goto _err; - } + // TODO: the schema update should be handled later(TD-17965) + if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } - SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0); - if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) { + if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { + taosMemoryFreeClear(pReq); + smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64 - " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; + + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, + SMA_VID(pSma), suid, pItem->level, output->info.version); + } - - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma), - suid, pItem->level); - - taosMemoryFreeClear(pReq); } taosArrayDestroy(pResList); @@ -1385,7 +1388,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } From cfca841567e7733aff38afac1c78e46c5423bcdb Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 21:06:17 +0800 Subject: [PATCH 8/9] fix: comment out tsma debug print --- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2c0c7f7c88..7a1cc04cdc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -357,7 +357,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO - blockDebugShowDataBlocks(data, __func__); + // blockDebugShowDataBlocks(data, __func__); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); } From 64a92ff8397d2e28982459697e8660044d511d58 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 21:31:34 +0800 Subject: [PATCH 9/9] test: tsma case optimization --- tests/script/tsim/sma/tsmaCreateInsertQuery.sim | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index e3b38d415c..718ff58ed4 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -61,6 +61,7 @@ endi print =============== select * from stb from memory in designated vgroup sql select _wstart, _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m); print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 if $rows != 1 then print rows $rows != 1 return -1