fix: function return code validation

This commit is contained in:
dapan1121 2024-07-19 17:37:48 +08:00
parent fc1753c1f1
commit e5fde30635
2 changed files with 140 additions and 62 deletions

View File

@ -61,6 +61,7 @@ typedef enum { M2C = 0, C2M } ConvType;
#define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src)) #define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src))
#define TAOS_STRNCPY(_dst, _src, _size) ((void)strncpy(_dst, _src, _size)) #define TAOS_STRNCPY(_dst, _src, _size) ((void)strncpy(_dst, _src, _size))
#define TAOS_STRCAT(_dst, _src) ((void)strcat(_dst, _src))
char *tstrdup(const char *src); char *tstrdup(const char *src);
int32_t taosUcs4len(TdUcs4 *ucs4); int32_t taosUcs4len(TdUcs4 *ucs4);

View File

@ -30,9 +30,9 @@
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) { static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
if (pFileInfo->fd.fd) { if (pFileInfo->fd.fd) {
taosCloseFile(&pFileInfo->fd.fd); (void)taosCloseFile(&pFileInfo->fd.fd);
pFileInfo->fd.fd = NULL; pFileInfo->fd.fd = NULL;
taosThreadMutexDestroy(&pFileInfo->fd.mutex); (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
} }
pFileInfo->deleted = true; pFileInfo->deleted = true;
} }
@ -88,7 +88,7 @@ static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
static void freeSGcSessionCtx(void* p) { static void freeSGcSessionCtx(void* p) {
SGcSessionCtx* pSession = p; SGcSessionCtx* pSession = p;
if (pSession->semInit) { if (pSession->semInit) {
tsem_destroy(&pSession->waitSem); (void)tsem_destroy(&pSession->waitSem);
} }
} }
@ -148,6 +148,10 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (NULL == p) {
qError("fail to get %dth col in dataBlock, numOfCols:%d", i, numOfCols);
continue;
}
taosMemoryFreeClear(p->pData); taosMemoryFreeClear(p->pData);
if (IS_VAR_DATA_TYPE(p->info.type)) { if (IS_VAR_DATA_TYPE(p->info.type)) {
taosMemoryFreeClear(p->varmeta.offset); taosMemoryFreeClear(p->varmeta.offset);
@ -196,10 +200,14 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char*
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
//TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE); //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
if (NULL == newFd) { if (NULL == newFd) {
return TAOS_SYSTEM_ERROR(errno); QRY_ERR_RET(TAOS_SYSTEM_ERROR(errno));
} }
pFileFd->fd = newFd; pFileFd->fd = newFd;
taosThreadMutexInit(&pFileFd->mutex, NULL); int32_t code = taosThreadMutexInit(&pFileFd->mutex, NULL);
if (code) {
qError("taosThreadMutexInit failed, code:%x", code);
QRY_ERR_RET(code);
}
qTrace("file path %s created", filename); qTrace("file path %s created", filename);
@ -218,13 +226,16 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId); (void)sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId);
SGroupCacheFileInfo newFile = {0}; SGroupCacheFileInfo newFile = {0};
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); if (taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile))) {
QRY_ERR_RET(terrno);
}
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY; qError("fail to get file %d from pCacheFile", fileId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} }
} }
@ -240,14 +251,14 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
} }
} }
taosThreadMutexLock(&pTmp->fd.mutex); (void)taosThreadMutexLock(&pTmp->fd.mutex);
*ppFd = &pTmp->fd; *ppFd = &pTmp->fd;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) { static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
taosThreadMutexUnlock(&pFd->mutex); (void)taosThreadMutexUnlock(&pFd->mutex);
} }
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
@ -275,7 +286,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
continue; continue;
} }
@ -295,7 +306,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
continue; continue;
} }
@ -321,7 +332,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
} }
_return: _return:
@ -330,7 +341,7 @@ _return:
taosHashRelease(pGrpHash, pGroup); taosHashRelease(pGrpHash, pGroup);
} }
atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); (void)atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
return code; return code;
} }
@ -341,7 +352,8 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
} }
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId)); pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
if (NULL == pBufInfo) { if (NULL == pBufInfo) {
return TSDB_CODE_OUT_OF_MEMORY; qError("fail to get blk %d from pCache->pDirtyBlk", pBufInfo->basic.blkId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL; SGcBlkBufInfo* pWriteHead = NULL;
@ -424,7 +436,7 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB
qError("group cache add block to cache failed, size:%" PRId64, bufSize); qError("group cache add block to cache failed, size:%" PRId64, bufSize);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
blockDataToBuf(pBufInfo->pBuf, pBlock); QRY_ERR_RET(blockDataToBuf(pBufInfo->pBuf, pBlock));
SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx; SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx;
@ -449,6 +461,10 @@ void blockDataDeepClear(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (NULL == p) {
qError("fail to get %d col from pDataBlock, numOfCols:%d", i, numOfCols);
continue;
}
p->pData = NULL; p->pData = NULL;
if (IS_VAR_DATA_TYPE(p->info.type)) { if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.offset = NULL; p->varmeta.offset = NULL;
@ -473,8 +489,9 @@ static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc)
taosMemoryFree(*ppDst); taosMemoryFree(*ppDst);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info)); TAOS_MEMCPY(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
blockDataDeepClear(*ppDst); blockDataDeepClear(*ppDst);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -490,11 +507,17 @@ static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** pp
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { static int32_t releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
blockDataDeepCleanup(pBlock); blockDataDeepCleanup(pBlock);
taosWLockLatch(&pCtx->blkLock); taosWLockLatch(&pCtx->blkLock);
taosArrayPush(pCtx->pFreeBlock, &pBlock); if (NULL == taosArrayPush(pCtx->pFreeBlock, &pBlock)) {
code = terrno;
}
taosWUnLockLatch(&pCtx->blkLock); taosWUnLockLatch(&pCtx->blkLock);
return code;
} }
@ -573,14 +596,15 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
return code; return code;
} }
taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); QRY_ERR_RET(taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList; pVgCtx->pTbList = pTbList;
pVgCtx->id = vgId; pVgCtx->id = vgId;
snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", (void)snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d",
tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId); tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId);
pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
@ -594,15 +618,27 @@ static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, S
if (NULL == pList) { if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pList, pNew); if (NULL == taosArrayPush(pList, pNew)) {
QRY_ERR_RET(terrno);
}
SGcVgroupCtx vgCtx = {0}; SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); QRY_ERR_RET(tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)));
pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
if (NULL == pNew->pGroup->pVgCtx) {
qError("fail to get vg %d ctx from vgHash", pNew->vgId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosArrayPush(pVgCtx->pTbList, pNew); if (NULL == taosArrayPush(pVgCtx->pTbList, pNew)) {
QRY_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -620,6 +656,11 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
if (NULL == pNew) {
qError("fail to get vg %d SGcNewGroupInfo from pNewGrpList", i);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (!pGCache->batchFetch) { if (!pGCache->batchFetch) {
code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew); code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew);
if (code) { if (code) {
@ -672,7 +713,9 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
if (code) { if (code) {
return code; return code;
} }
taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock); if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
QRY_ERR_RET(terrno);
}
} }
} }
@ -681,26 +724,36 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
return code; return code;
} }
static void notifyWaitingSessions(SArray* pWaitQueue) { static int32_t notifyWaitingSessions(SArray* pWaitQueue) {
if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) { if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
return; return TSDB_CODE_SUCCESS;
} }
int32_t n = taosArrayGetSize(pWaitQueue); int32_t n = taosArrayGetSize(pWaitQueue);
for (int32_t i = 0; i < n; ++i) { for (int32_t i = 0; i < n; ++i) {
SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i); SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
tsem_post(&pSession->waitSem); if (NULL == pSession) {
qError("fail to get %d SGcSessionCtx in pWaitQueue, total:%d", i, n);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
QRY_ERR_RET(tsem_post(&pSession->waitSem));
} }
return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { static FORCE_INLINE int32_t handleGroupFetchDone(SGroupCacheData* pGroup) {
int32_t code = TSDB_CODE_SUCCESS;
pGroup->pBlock = NULL; pGroup->pBlock = NULL;
atomic_store_8((int8_t*)&pGroup->fetchDone, true); atomic_store_8((int8_t*)&pGroup->fetchDone, true);
taosThreadMutexLock(&pGroup->mutex); (void)taosThreadMutexLock(&pGroup->mutex);
notifyWaitingSessions(pGroup->waitQueue); code = notifyWaitingSessions(pGroup->waitQueue);
taosArrayClear(pGroup->waitQueue); taosArrayClear(pGroup->waitQueue);
taosThreadMutexUnlock(&pGroup->mutex); (void)taosThreadMutexUnlock(&pGroup->mutex);
return code;
} }
static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) { static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) {
@ -714,14 +767,15 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); (void)sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId);
SGroupCacheFileInfo newFile = {0}; SGroupCacheFileInfo newFile = {0};
newFile.groupNum = 1; newFile.groupNum = 1;
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); QRY_ERR_RET(taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY; qError("fail to get file %d in pCacheFile", fileId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} }
} else { } else {
pTmp->groupNum++; pTmp->groupNum++;
@ -746,7 +800,7 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat
if (NULL == pNew || pNew->uid == uid) { if (NULL == pNew || pNew->uid == uid) {
break; break;
} }
handleGroupFetchDone(pNew->pGroup); QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
} }
groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true); groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true);
@ -764,15 +818,21 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat
} }
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) { static FORCE_INLINE int32_t initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) {
taosThreadMutexInit(&pGroup->mutex, NULL); QRY_ERR_RET(taosThreadMutexInit(&pGroup->mutex, NULL));
pGroup->downstreamIdx = downstreamIdx; pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId; pGroup->vgId = vgId;
pGroup->fileId = -1; pGroup->fileId = -1;
pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic)); pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
if (NULL == pGroup->blkList.pList) {
QRY_ERR_RET(terrno);
}
pGroup->startOffset = -1; pGroup->startOffset = -1;
pGroup->needCache = needCache; pGroup->needCache = needCache;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
return TSDB_CODE_SUCCESS;
} }
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
@ -801,7 +861,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
if (NULL == *ppGrp) { if (NULL == *ppGrp) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); QRY_ERR_RET(initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache));
qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache); qDebug("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
@ -829,7 +889,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) { static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) {
taosWLockLatch(&pGroup->blkList.lock); taosWLockLatch(&pGroup->blkList.lock);
taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic); if (NULL == taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic)) {
QRY_ERR_RET(terrno);
}
*pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1; *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1;
taosWUnLockLatch(&pGroup->blkList.lock); taosWUnLockLatch(&pGroup->blkList.lock);
@ -889,7 +951,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
pGroup->pBlock = pBlock; pGroup->pBlock = pBlock;
} }
notifyWaitingSessions(pGroup->waitQueue); QRY_ERR_RET(notifyWaitingSessions(pGroup->waitQueue));
if (pGroup == pSession->pGroupData) { if (pGroup == pSession->pGroupData) {
if (pGroup->needCache) { if (pGroup->needCache) {
pSession->lastBlkId = newBlkIdx; pSession->lastBlkId = newBlkIdx;
@ -909,7 +971,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = NULL; SGroupCacheData* pGroup = NULL;
while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) { while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
handleGroupFetchDone(pGroup); QRY_ERR_RET(handleGroupFetchDone(pGroup));
} }
pCtx->fetchDone = true; pCtx->fetchDone = true;
} else { } else {
@ -920,7 +982,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
uidNum = taosArrayGetSize(pVgCtx->pTbList); uidNum = taosArrayGetSize(pVgCtx->pTbList);
for (int32_t i = 0; i < uidNum; ++i) { for (int32_t i = 0; i < uidNum; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i); SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
handleGroupFetchDone(pNew->pGroup); QRY_ERR_RET(handleGroupFetchDone(pNew->pGroup));
} }
taosArrayClear(pVgCtx->pTbList); taosArrayClear(pVgCtx->pTbList);
} }
@ -961,8 +1023,8 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
} }
SGcSessionCtx* pWaitCtx = *ppWaitCtx; SGcSessionCtx* pWaitCtx = *ppWaitCtx;
pWaitCtx->newFetch = true; pWaitCtx->newFetch = true;
taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); (void)taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
tsem_post(&pWaitCtx->waitSem); QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
return code; return code;
} }
@ -1028,36 +1090,47 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr
SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = pSession->pGroupData; SGroupCacheData* pGroup = pSession->pGroupData;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool inLock = true;
if (NULL == pGroup->waitQueue) { if (NULL == pGroup->waitQueue) {
pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES); pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
if (NULL == pGroup->waitQueue) { if (NULL == pGroup->waitQueue) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex); QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
taosArrayPush(pGroup->waitQueue, &pSession); if (NULL == taosArrayPush(pGroup->waitQueue, &pSession)) {
QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (!pSession->semInit) { if (!pSession->semInit) {
tsem_init(&pSession->waitSem, 0, 0); QRY_ERR_JRET(tsem_init(&pSession->waitSem, 0, 0));
pSession->semInit = true; pSession->semInit = true;
} }
taosThreadMutexUnlock(&pSession->pGroupData->mutex); (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
inLock = false;
taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES); QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
tsem_wait(&pSession->waitSem); (void)tsem_wait(&pSession->waitSem);
if (pSession->newFetch) { if (pSession->newFetch) {
pSession->newFetch = false; pSession->newFetch = false;
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
} }
taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); (void)taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
bool got = false; bool got = false;
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got); return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
_return:
if (inLock) {
(void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
}
return code;
} }
@ -1077,7 +1150,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s
if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId) if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
|| (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) { || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
if (locked) { if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex); (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
locked = false; locked = false;
} }
@ -1095,7 +1168,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s
break; break;
} }
taosThreadMutexLock(&pSession->pGroupData->mutex); (void)taosThreadMutexLock(&pSession->pGroupData->mutex);
locked = true; locked = true;
}; };
@ -1103,7 +1176,7 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s
_return: _return:
if (locked) { if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex); (void)taosThreadMutexUnlock(&pSession->pGroupData->mutex);
} }
return code; return code;
@ -1165,6 +1238,10 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
} }
*ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == *ppSession) {
qError("fail to get session %d from pSessions", pGcParam->sessionId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1187,15 +1264,15 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
} else if (pSession->pGroupData->needCache) { } else if (pSession->pGroupData->needCache) {
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (ppBlock) { if (ppBlock) {
releaseBaseBlockToList(pCtx, *ppBlock); QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); (void)taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
} }
} }
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
if (NULL == *ppRes) { if (NULL == *ppRes) {
qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows); qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); (void)taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
} else { } else {
pSession->resRows += (*ppRes)->info.rows; pSession->resRows += (*ppRes)->info.rows;
qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows); qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
@ -1241,7 +1318,7 @@ static void freeRemoveGroupCacheData(void* p) {
taosArrayDestroy(pGroup->waitQueue); taosArrayDestroy(pGroup->waitQueue);
taosArrayDestroy(pGroup->blkList.pList); taosArrayDestroy(pGroup->blkList.pList);
taosThreadMutexDestroy(&pGroup->mutex); (void)taosThreadMutexDestroy(&pGroup->mutex);
qTrace("group removed"); qTrace("group removed");
} }
@ -1271,7 +1348,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
int32_t defaultVg = 0; int32_t defaultVg = 0;
SGcVgroupCtx vgCtx = {0}; SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL); initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL);
tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); QRY_ERR_RET(tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)));
} }
pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
@ -1302,7 +1379,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", (void)snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d",
tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id); tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id);
pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0; pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename); pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);