diff --git a/include/os/osString.h b/include/os/osString.h index e53aceb83a..05492763c2 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -61,6 +61,7 @@ typedef enum { M2C = 0, C2M } ConvType; #define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src)) #define TAOS_STRNCPY(_dst, _src, _size) ((void)strncpy(_dst, _src, _size)) +#define TAOS_STRCAT(_dst, _src) ((void)strcat(_dst, _src)) char *tstrdup(const char *src); int32_t taosUcs4len(TdUcs4 *ucs4); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 28bccaedff..379cceeb03 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -30,9 +30,9 @@ static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) { if (pFileInfo->fd.fd) { - taosCloseFile(&pFileInfo->fd.fd); + (void)taosCloseFile(&pFileInfo->fd.fd); pFileInfo->fd.fd = NULL; - taosThreadMutexDestroy(&pFileInfo->fd.mutex); + (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex); } pFileInfo->deleted = true; } @@ -88,7 +88,7 @@ static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) { static void freeSGcSessionCtx(void* p) { SGcSessionCtx* pSession = p; if (pSession->semInit) { - tsem_destroy(&pSession->waitSem); + (void)tsem_destroy(&pSession->waitSem); } } @@ -148,6 +148,10 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (NULL == p) { + qError("fail to get %dth col in dataBlock, numOfCols:%d", i, numOfCols); + continue; + } taosMemoryFreeClear(p->pData); if (IS_VAR_DATA_TYPE(p->info.type)) { taosMemoryFreeClear(p->varmeta.offset); @@ -196,10 +200,14 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE); if (NULL == newFd) { - return TAOS_SYSTEM_ERROR(errno); + QRY_ERR_RET(TAOS_SYSTEM_ERROR(errno)); } pFileFd->fd = newFd; - taosThreadMutexInit(&pFileFd->mutex, NULL); + int32_t code = taosThreadMutexInit(&pFileFd->mutex, NULL); + if (code) { + qError("taosThreadMutexInit failed, code:%x", code); + QRY_ERR_RET(code); + } qTrace("file path %s created", filename); @@ -218,13 +226,16 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId); + (void)sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId); SGroupCacheFileInfo newFile = {0}; - taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); + if (taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))) { + QRY_ERR_RET(terrno); + } pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - return TSDB_CODE_OUT_OF_MEMORY; + qError("fail to get file %d from pCacheFile", fileId); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } } @@ -240,14 +251,14 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S } } - taosThreadMutexLock(&pTmp->fd.mutex); + (void)taosThreadMutexLock(&pTmp->fd.mutex); *ppFd = &pTmp->fd; return TSDB_CODE_SUCCESS; } static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) { - taosThreadMutexUnlock(&pFd->mutex); + (void)taosThreadMutexUnlock(&pFd->mutex); } static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { @@ -275,7 +286,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); continue; } @@ -295,7 +306,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); continue; } @@ -321,7 +332,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); } _return: @@ -330,7 +341,7 @@ _return: taosHashRelease(pGrpHash, pGroup); } - atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); + (void)atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); return code; } @@ -341,7 +352,8 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr } pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId)); if (NULL == pBufInfo) { - return TSDB_CODE_OUT_OF_MEMORY; + qError("fail to get blk %d from pCache->pDirtyBlk", pBufInfo->basic.blkId); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } int32_t code = TSDB_CODE_SUCCESS; SGcBlkBufInfo* pWriteHead = NULL; @@ -424,7 +436,7 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB qError("group cache add block to cache failed, size:%" PRId64, bufSize); return TSDB_CODE_OUT_OF_MEMORY; } - blockDataToBuf(pBufInfo->pBuf, pBlock); + QRY_ERR_RET(blockDataToBuf(pBufInfo->pBuf, pBlock)); SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx; @@ -449,6 +461,10 @@ void blockDataDeepClear(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (NULL == p) { + qError("fail to get %d col from pDataBlock, numOfCols:%d", i, numOfCols); + continue; + } p->pData = NULL; if (IS_VAR_DATA_TYPE(p->info.type)) { p->varmeta.offset = NULL; @@ -473,8 +489,9 @@ static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) taosMemoryFree(*ppDst); return TSDB_CODE_OUT_OF_MEMORY; } - memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info)); + TAOS_MEMCPY(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info)); blockDataDeepClear(*ppDst); + return TSDB_CODE_SUCCESS; } @@ -490,11 +507,17 @@ static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** pp return TSDB_CODE_SUCCESS; } -static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { +static int32_t releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + blockDataDeepCleanup(pBlock); taosWLockLatch(&pCtx->blkLock); - taosArrayPush(pCtx->pFreeBlock, &pBlock); + if (NULL == taosArrayPush(pCtx->pFreeBlock, &pBlock)) { + code = terrno; + } taosWUnLockLatch(&pCtx->blkLock); + + return code; } @@ -573,14 +596,15 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC return code; } - taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); + QRY_ERR_RET(taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES)); + return TSDB_CODE_SUCCESS; } static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { pVgCtx->pTbList = pTbList; pVgCtx->id = vgId; - snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", + (void)snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId); pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; @@ -594,15 +618,27 @@ static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, S if (NULL == pList) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pList, pNew); + if (NULL == taosArrayPush(pList, pNew)) { + QRY_ERR_RET(terrno); + } + SGcVgroupCtx vgCtx = {0}; initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); - tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); + QRY_ERR_RET(tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx))); + pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); + if (NULL == pNew->pGroup->pVgCtx) { + qError("fail to get vg %d ctx from vgHash", pNew->vgId); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + return TSDB_CODE_SUCCESS; } - taosArrayPush(pVgCtx->pTbList, pNew); + if (NULL == taosArrayPush(pVgCtx->pTbList, pNew)) { + QRY_ERR_RET(terrno); + } + return TSDB_CODE_SUCCESS; } @@ -620,6 +656,11 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp for (int32_t i = 0; i < num; ++i) { SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); + if (NULL == pNew) { + qError("fail to get vg %d SGcNewGroupInfo from pNewGrpList", i); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (!pGCache->batchFetch) { code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew); if (code) { @@ -672,7 +713,9 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p if (code) { return code; } - taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock); + if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) { + QRY_ERR_RET(terrno); + } } } @@ -681,26 +724,36 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p return code; } -static void notifyWaitingSessions(SArray* pWaitQueue) { +static int32_t notifyWaitingSessions(SArray* pWaitQueue) { if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) { - return; + return TSDB_CODE_SUCCESS; } int32_t n = taosArrayGetSize(pWaitQueue); for (int32_t i = 0; i < n; ++i) { SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i); - tsem_post(&pSession->waitSem); + if (NULL == pSession) { + qError("fail to get %d SGcSessionCtx in pWaitQueue, total:%d", i, n); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + + QRY_ERR_RET(tsem_post(&pSession->waitSem)); } + + return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { +static FORCE_INLINE int32_t handleGroupFetchDone(SGroupCacheData* pGroup) { + int32_t code = TSDB_CODE_SUCCESS; pGroup->pBlock = NULL; atomic_store_8((int8_t*)&pGroup->fetchDone, true); - taosThreadMutexLock(&pGroup->mutex); - notifyWaitingSessions(pGroup->waitQueue); + (void)taosThreadMutexLock(&pGroup->mutex); + code = notifyWaitingSessions(pGroup->waitQueue); taosArrayClear(pGroup->waitQueue); - taosThreadMutexUnlock(&pGroup->mutex); + (void)taosThreadMutexUnlock(&pGroup->mutex); + + return code; } static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) { @@ -714,14 +767,15 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); + (void)sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); SGroupCacheFileInfo newFile = {0}; newFile.groupNum = 1; - taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); + QRY_ERR_RET(taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - return TSDB_CODE_OUT_OF_MEMORY; + qError("fail to get file %d in pCacheFile", fileId); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } } else { pTmp->groupNum++; @@ -746,7 +800,7 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat if (NULL == pNew || pNew->uid == uid) { break; } - handleGroupFetchDone(pNew->pGroup); + QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup)); } groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true); @@ -764,15 +818,21 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat } -static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) { - taosThreadMutexInit(&pGroup->mutex, NULL); +static FORCE_INLINE int32_t initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) { + QRY_ERR_RET(taosThreadMutexInit(&pGroup->mutex, NULL)); + pGroup->downstreamIdx = downstreamIdx; pGroup->vgId = vgId; pGroup->fileId = -1; pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic)); + if (NULL == pGroup->blkList.pList) { + QRY_ERR_RET(terrno); + } pGroup->startOffset = -1; pGroup->needCache = needCache; pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); + + return TSDB_CODE_SUCCESS; } static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { @@ -801,7 +861,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* if (NULL == *ppGrp) { return TSDB_CODE_OUT_OF_MEMORY; } - initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); + QRY_ERR_RET(initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache)); qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache); @@ -829,7 +889,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) { taosWLockLatch(&pGroup->blkList.lock); - taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic); + if (NULL == taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic)) { + QRY_ERR_RET(terrno); + } *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1; taosWUnLockLatch(&pGroup->blkList.lock); @@ -889,7 +951,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD pGroup->pBlock = pBlock; } - notifyWaitingSessions(pGroup->waitQueue); + QRY_ERR_RET(notifyWaitingSessions(pGroup->waitQueue)); if (pGroup == pSession->pGroupData) { if (pGroup->needCache) { pSession->lastBlkId = newBlkIdx; @@ -909,7 +971,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData* pGroup = NULL; while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) { - handleGroupFetchDone(pGroup); + QRY_ERR_RET(handleGroupFetchDone(pGroup)); } pCtx->fetchDone = true; } else { @@ -920,7 +982,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes uidNum = taosArrayGetSize(pVgCtx->pTbList); for (int32_t i = 0; i < uidNum; ++i) { SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i); - handleGroupFetchDone(pNew->pGroup); + QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup)); } taosArrayClear(pVgCtx->pTbList); } @@ -961,8 +1023,8 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator } SGcSessionCtx* pWaitCtx = *ppWaitCtx; pWaitCtx->newFetch = true; - taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); - tsem_post(&pWaitCtx->waitSem); + (void)taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); + QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem)); return code; } @@ -1028,36 +1090,47 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheData* pGroup = pSession->pGroupData; int32_t code = TSDB_CODE_SUCCESS; + bool inLock = true; if (NULL == pGroup->waitQueue) { pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES); if (NULL == pGroup->waitQueue) { - taosThreadMutexUnlock(&pSession->pGroupData->mutex); - return TSDB_CODE_OUT_OF_MEMORY; + QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } } - taosArrayPush(pGroup->waitQueue, &pSession); + if (NULL == taosArrayPush(pGroup->waitQueue, &pSession)) { + QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } if (!pSession->semInit) { - tsem_init(&pSession->waitSem, 0, 0); + QRY_ERR_JRET(tsem_init(&pSession->waitSem, 0, 0)); pSession->semInit = true; } - taosThreadMutexUnlock(&pSession->pGroupData->mutex); + (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex); + inLock = false; - taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES); + QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES)); - tsem_wait(&pSession->waitSem); + (void)tsem_wait(&pSession->waitSem); if (pSession->newFetch) { pSession->newFetch = false; return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); } - taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); + (void)taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); bool got = false; return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got); + +_return: + + if (inLock) { + (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex); + } + + return code; } @@ -1077,7 +1150,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId) || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) { if (locked) { - taosThreadMutexUnlock(&pSession->pGroupData->mutex); + (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex); locked = false; } @@ -1095,7 +1168,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s break; } - taosThreadMutexLock(&pSession->pGroupData->mutex); + (void)taosThreadMutexLock(&pSession->pGroupData->mutex); locked = true; }; @@ -1103,7 +1176,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s _return: if (locked) { - taosThreadMutexUnlock(&pSession->pGroupData->mutex); + (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex); } return code; @@ -1165,6 +1238,10 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP } *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + if (NULL == *ppSession) { + qError("fail to get session %d from pSessions", pGcParam->sessionId); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } return TSDB_CODE_SUCCESS; } @@ -1187,15 +1264,15 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock } else if (pSession->pGroupData->needCache) { SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (ppBlock) { - releaseBaseBlockToList(pCtx, *ppBlock); - taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock)); + (void)taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); } } code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); if (NULL == *ppRes) { qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows); - taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + (void)taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); } else { pSession->resRows += (*ppRes)->info.rows; qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows); @@ -1241,7 +1318,7 @@ static void freeRemoveGroupCacheData(void* p) { taosArrayDestroy(pGroup->waitQueue); taosArrayDestroy(pGroup->blkList.pList); - taosThreadMutexDestroy(&pGroup->mutex); + (void)taosThreadMutexDestroy(&pGroup->mutex); qTrace("group removed"); } @@ -1271,7 +1348,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { int32_t defaultVg = 0; SGcVgroupCtx vgCtx = {0}; initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL); - tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); + QRY_ERR_RET(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx))); } pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); @@ -1302,7 +1379,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { return TSDB_CODE_OUT_OF_MEMORY; } - snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", + (void)snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id); pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0; pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);