diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5b39bc854e..410fa02ded 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -249,6 +249,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); + char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dba30bb876..8489627721 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { char pBuf[128] = {0}; int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); + SSDataBlock* pDataBlock = taosArrayGetP(dataBlocks, i); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; @@ -1870,10 +1870,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @brief TODO: Assume that the final generated result it less than 3M * * @param pReq - * @param pDataBlock + * @param pDataBlocks * @param vgId * @param suid - * + * */ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d13daffa08..7c6807ab87 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1a226abe5c..0a42f06081 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dGError("msg:%p, failed to process since %s", pMsg, terrstr()); + dGError("msg:%p, failed to process since %s", pMsg, terrstr(code)); } vmSendRsp(pMsg, code); } @@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code)); vmSendRsp(pMsg, code); } @@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_GRANT_EXPIRED; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); @@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_VND_NO_DISKSPACE; code = terrno; - dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_VND_NO_WRITE_AUTH; code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); } else { dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6b513f0242..b7a2efd489 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } + pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -613,34 +615,38 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm while (1) { uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); - if (code < 0) { - smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, - pItem->level, terrstr(code)); - goto _err; + if (code < 0) { + if (code == TSDB_CODE_QRY_IN_EXEC) { + break; + } else { + smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, + pItem->level, terrstr(code)); + goto _err; + } } if (taosArrayGetSize(pResList) == 0) { if (terrno == 0) { - smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); + // smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); } else { smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); goto _err; } break; + } else { + smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } +#if 1 + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, pItem->level); + blockDebugShowDataBlocks(pResList, flag); +#endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - -#if 1 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); -// blockDebugShowDataBlocks(output, flag); -// taosArrayDestroy(pResult); -#endif - STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { @@ -655,11 +661,11 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } - + taosMemoryFreeClear(pReq); + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, SMA_VID(pSma), suid, pItem->level, output->info.version); - taosMemoryFreeClear(pReq); } } @@ -692,15 +698,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } else { - ASSERT(0); } return TSDB_CODE_SUCCESS; @@ -746,7 +749,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; // lock @@ -793,13 +795,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { - smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); - return TSDB_CODE_SUCCESS; - } - - if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) { - tdReleaseRSmaInfo(pSma, pRSmaInfo); - smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); + smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } @@ -1331,14 +1327,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pRSmaInfo->refId); return; } SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pRSmaInfo->refId); + smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", + smaMgmt.rsetId, pRSmaInfo->refId); return; } @@ -1350,8 +1348,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 - " refId:%d", + smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 + ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); @@ -1366,30 +1364,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); } break; case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INIT: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", + smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } _end: + // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } @@ -1402,7 +1401,7 @@ _end: * @return int32_t */ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; + SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; SEncoder encoder = {0}; @@ -1431,7 +1430,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { .contLen = contLen, }; - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), pInfo->suid, level, terrstr()); goto _err; @@ -1462,7 +1461,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - return -1; + goto _err; } pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); @@ -1479,7 +1478,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { terrno = TSDB_CODE_RSMA_EMPTY_INFO; } smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), - req.suid, req.level, terrstr()); + req.suid, req.level, terrstr()); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ecff58f3b1..d5c5e18668 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_FETCH_RSMA: + return smaProcessFetch(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: @@ -357,7 +357,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO - blockDebugShowDataBlocks(data, __func__); + // blockDebugShowDataBlocks(data, __func__); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); } diff --git a/source/libs/executor/inc/tsimplehash.h b/source/libs/executor/inc/tsimplehash.h index a1ba70c702..a56f8e8c04 100644 --- a/source/libs/executor/inc/tsimplehash.h +++ b/source/libs/executor/inc/tsimplehash.h @@ -45,6 +45,8 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t */ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); +int32_t tSimpleHashPrint(const SSHashObj *pHashObj); + /** * put element into hash table, if the element with the same key exists, update it * @param pHashObj @@ -98,6 +100,15 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); */ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); +/** + * Create the hash table iterator + * @param pHashObj + * @param data + * @param iter + * @return void* + */ +void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1fc0f5ff51..7115ad85a5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { cleanUpUdfs(); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); atomic_store_64(&pTaskInfo->owner, 0); - return pTaskInfo->code; } diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index e709643af9..7989ad2b5a 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -62,7 +62,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t } SSHashObj *pHashObj = (SSHashObj *)taosMemoryCalloc(1, sizeof(SSHashObj)); - if (pHashObj == NULL) { + if (!pHashObj) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -78,7 +78,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t pHashObj->dataLen = dataLen; pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); - if (pHashObj->hashList == NULL) { + if (!pHashObj->hashList) { taosMemoryFree(pHashObj); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -87,7 +87,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t } int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return 0; } return (int32_t)atomic_load_64((int64_t *)&pHashObj->size); @@ -95,7 +95,7 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize); - if (pNewNode == NULL) { + if (!pNewNode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -120,7 +120,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) { int64_t st = taosGetTimestampUs(); void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); - if (pNewEntryList == NULL) { + if (!pNewEntryList) { // qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; } @@ -133,22 +133,21 @@ static void taosHashTableResize(SSHashObj *pHashObj) { for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) { SHNode *pNode = pHashObj->hashList[idx]; - if (pNode == NULL) { + if (!pNode) { continue; } - SHNode *pNext; + SHNode *pNext = NULL; SHNode *pPrev = NULL; - while (pNode != NULL) { void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen); - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->dataLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity); pNext = pNode->next; if (newIdx != idx) { - if (pPrev == NULL) { + if (!pPrev) { pHashObj->hashList[idx] = pNext; } else { pPrev->next = pNext; @@ -172,7 +171,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) { } int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { - if (pHashObj == NULL || key == NULL) { + if (!pHashObj || !key) { return -1; } @@ -186,13 +185,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; - if (pNode == NULL) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal); - if (pNewNode == NULL) { + if (!pNode) { + SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); + if (!pNewNode) { return -1; } pHashObj->hashList[slot] = pNewNode; + atomic_add_fetch_64(&pHashObj->size, 1); return 0; } @@ -203,9 +203,9 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { pNode = pNode->next; } - if (pNode == NULL) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->size, hashVal); - if (pNewNode == NULL) { + if (!pNode) { + SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); + if (!pNewNode) { return -1; } pNewNode->next = pHashObj->hashList[slot]; @@ -234,7 +234,7 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; } void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { - if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL) { + if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) { return NULL; } @@ -242,7 +242,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; - if (pNode == NULL) { + if (!pNode) { return NULL; } @@ -256,19 +256,43 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { } int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) { - // todo + if (!pHashObj || !key) { + return TSDB_CODE_FAILED; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + + SHNode *pNode = pHashObj->hashList[slot]; + SHNode *pPrev = NULL; + while (pNode) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { + if (!pPrev) { + pHashObj->hashList[slot] = pNode->next; + } else { + pPrev->next = pNode->next; + } + FREE_HASH_NODE(pNode); + atomic_sub_fetch_64(&pHashObj->size, 1); + break; + } + pPrev = pNode; + pNode = pNode->next; + } + return TSDB_CODE_SUCCESS; } void tSimpleHashClear(SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj || taosHashTableEmpty(pHashObj)) { return; } - SHNode *pNode, *pNext; + SHNode *pNode = NULL, *pNext = NULL; for (int32_t i = 0; i < pHashObj->capacity; ++i) { pNode = pHashObj->hashList[i]; - if (pNode == NULL) { + if (!pNode) { continue; } @@ -278,11 +302,11 @@ void tSimpleHashClear(SSHashObj *pHashObj) { pNode = pNext; } } - pHashObj->size = 0; + atomic_store_64(&pHashObj->size, 0); } void tSimpleHashCleanup(SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return; } @@ -291,7 +315,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { } size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { - if (pHashObj == NULL) { + if (!pHashObj) { return 0; } @@ -299,11 +323,58 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { } void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) { +#if 0 int32_t offset = offsetof(SHNode, data); SHNode *node = ((SHNode *)(char *)data - offset); - if (keyLen != NULL) { + if (keyLen) { *keyLen = pHashObj->keyLen; } + return POINTER_SHIFT(data, pHashObj->dataLen); + return GET_SHASH_NODE_KEY(node, pHashObj->dataLen); +#endif + if (keyLen) { + *keyLen = pHashObj->keyLen; + } + + return POINTER_SHIFT(data, pHashObj->dataLen); +} + +void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { + if (!pHashObj) { + return NULL; + } + + SHNode *pNode = NULL; + + if (!data) { + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + return GET_SHASH_NODE_DATA(pNode); + } + return NULL; + } + + pNode = (SHNode *)((char *)data - offsetof(SHNode, data)); + + if (pNode->next) { + return GET_SHASH_NODE_DATA(pNode->next); + } + + ++(*iter); + for (int32_t i = *iter; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + return GET_SHASH_NODE_DATA(pNode); + } + + return NULL; } \ No newline at end of file diff --git a/source/libs/executor/test/CMakeLists.txt b/source/libs/executor/test/CMakeLists.txt index acab27ec08..18ca954813 100644 --- a/source/libs/executor/test/CMakeLists.txt +++ b/source/libs/executor/test/CMakeLists.txt @@ -17,4 +17,19 @@ IF(NOT TD_DARWIN) PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/" PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc" ) -ENDIF () \ No newline at end of file +ENDIF () + +# SET(CMAKE_CXX_STANDARD 11) +# AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# ADD_EXECUTABLE(tSimpleHashTest tSimpleHashTests.cpp) +# TARGET_LINK_LIBRARIES( +# tSimpleHashTest +# PRIVATE os util common executor gtest_main +# ) + +# TARGET_INCLUDE_DIRECTORIES( +# tSimpleHashTest +# PUBLIC "${TD_SOURCE_DIR}/include/common" +# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +# ) \ No newline at end of file diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp new file mode 100644 index 0000000000..a17a7146ea --- /dev/null +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "taos.h" +#include "thash.h" +#include "tsimplehash.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +// int main(int argc, char **argv) { +// testing::InitGoogleTest(&argc, argv); +// return RUN_ALL_TESTS(); +// } + +TEST(testCase, tSimpleHashTest) { + SSHashObj *pHashObj = + tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t)); + + assert(pHashObj != nullptr); + + ASSERT_EQ(0, tSimpleHashGetSize(pHashObj)); + + int64_t originKeySum = 0; + for (int64_t i = 1; i <= 100; ++i) { + originKeySum += i; + tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i); + ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); + } + + for (int64_t i = 1; i <= 100; ++i) { + void *data = tSimpleHashGet(pHashObj, (const void *)&i); + ASSERT_EQ(i, *(int64_t *)data); + } + + + void *data = NULL; + int32_t iter = 0; + int64_t keySum = 0; + int64_t dataSum = 0; + while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { + void *key = tSimpleHashGetKey(pHashObj, data, NULL); + keySum += *(int64_t *)key; + dataSum += *(int64_t *)data; + } + + ASSERT_EQ(keySum, dataSum); + ASSERT_EQ(keySum, originKeySum); + + for (int64_t i = 1; i <= 100; ++i) { + tSimpleHashRemove(pHashObj, (const void *)&i); + ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); + } + + tSimpleHashCleanup(pHashObj); +} + +#pragma GCC diagnostic pop \ No newline at end of file diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index cc1d507df2..2ff01263a4 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -61,6 +61,7 @@ endi print =============== select * from stb from memory in designated vgroup sql select _wstart, _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m); print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 if $rows != 1 then print rows $rows != 1 return -1