diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index aa377cd922..9f0ea0a87f 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -53,7 +53,7 @@ typedef struct SDataDeleterHandle { TdThreadMutex mutex; } SDataDeleterHandle; -static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { +static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; @@ -65,14 +65,23 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pBuf->useSize = sizeof(SDataCacheEntry); SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); + if (NULL == pColRes) { + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1); + if (NULL == pColSKey) { + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2); + if (NULL == pColEKey) { + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SDeleterRes* pRes = (SDeleterRes*)pEntry->data; pRes->suid = pHandle->pParam->suid; pRes->uidList = pHandle->pParam->pUidList; - strcpy(pRes->tableName, pHandle->pDeleter->tableFName); - strcpy(pRes->tsColName, pHandle->pDeleter->tsColName); + TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName); + TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName); pRes->affectedRows = *(int64_t*)pColRes->pData; if (pRes->affectedRows) { @@ -88,16 +97,18 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pBuf->useSize += pEntry->dataLen; - atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); - atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); + (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + + return TSDB_CODE_SUCCESS; } -static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) { +static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) { uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery; if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) { qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, taosQueueItemSize(pDeleter->pDataBlocks)); - return false; + return TSDB_CODE_OUT_OF_MEMORY; } pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes); @@ -105,55 +116,66 @@ static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDa pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); + return terrno; } - return NULL != pBuf->pData; + return TSDB_CODE_SUCCESS; } static int32_t updateStatus(SDataDeleterHandle* pDeleter) { - taosThreadMutexLock(&pDeleter->mutex); + (void)taosThreadMutexLock(&pDeleter->mutex); int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); int32_t status = (0 == blockNums ? DS_BUF_EMPTY : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDeleter->status = status; - taosThreadMutexUnlock(&pDeleter->mutex); + (void)taosThreadMutexUnlock(&pDeleter->mutex); + return status; } static int32_t getStatus(SDataDeleterHandle* pDeleter) { - taosThreadMutexLock(&pDeleter->mutex); + (void)taosThreadMutexLock(&pDeleter->mutex); int32_t status = pDeleter->status; - taosThreadMutexUnlock(&pDeleter->mutex); + (void)taosThreadMutexUnlock(&pDeleter->mutex); + return status; } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - SDataDeleterBuf* pBuf; + SDataDeleterBuf* pBuf = NULL; int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf); if (code) { return code; } - if (!allocBuf(pDeleter, pInput, pBuf)) { + code = allocBuf(pDeleter, pInput, pBuf); + if (code) { taosFreeQitem(pBuf); - return TSDB_CODE_OUT_OF_MEMORY; + return code; } - toDataCacheEntry(pDeleter, pInput, pBuf); - taosWriteQitem(pDeleter->pDataBlocks, pBuf); + QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf)); + QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf)); *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); + return TSDB_CODE_SUCCESS; + +_return: + + taosFreeQitem(pBuf); + + return code; } static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - taosThreadMutexLock(&pDeleter->mutex); + (void)taosThreadMutexLock(&pDeleter->mutex); pDeleter->queryEnd = true; pDeleter->useconds = useconds; - taosThreadMutexUnlock(&pDeleter->mutex); + (void)taosThreadMutexUnlock(&pDeleter->mutex); } static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) { @@ -165,9 +187,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw } SDataDeleterBuf* pBuf = NULL; - taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + (void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { - memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); + TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); taosFreeQitem(pBuf); } @@ -192,35 +214,35 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); - memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); + TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen); pDeleter->pParam->pUidList = NULL; pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; - atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + (void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); + (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent pOutput->bufStatus = updateStatus(pDeleter); - taosThreadMutexLock(&pDeleter->mutex); + (void)taosThreadMutexLock(&pDeleter->mutex); pOutput->queryEnd = pDeleter->queryEnd; pOutput->useconds = pDeleter->useconds; pOutput->precision = pDeleter->pSchema->precision; - taosThreadMutexUnlock(&pDeleter->mutex); + (void)taosThreadMutexUnlock(&pDeleter->mutex); return TSDB_CODE_SUCCESS; } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); + (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); taosMemoryFreeClear(pDeleter->nextOutput.pData); taosArrayDestroy(pDeleter->pParam->pUidList); taosMemoryFree(pDeleter->pParam); while (!taosQueueEmpty(pDeleter->pDataBlocks)) { SDataDeleterBuf* pBuf = NULL; - taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + (void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { taosMemoryFreeClear(pBuf->pData); @@ -228,9 +250,10 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } } taosCloseQueue(pDeleter->pDataBlocks); - taosThreadMutexDestroy(&pDeleter->mutex); + (void)taosThreadMutexDestroy(&pDeleter->mutex); taosMemoryFree(pDeleter->pManager); + return TSDB_CODE_SUCCESS; } @@ -247,8 +270,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { + code = terrno; taosMemoryFree(pParam); - code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } @@ -276,17 +299,22 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData if (code) { goto _end; } - taosThreadMutexInit(&deleter->mutex, NULL); + code = taosThreadMutexInit(&deleter->mutex, NULL); + if (code) { + goto _end; + } *pHandle = deleter; return code; _end: + if (deleter != NULL) { - destroyDataSinker((SDataSinkHandle*)deleter); + (void)destroyDataSinker((SDataSinkHandle*)deleter); taosMemoryFree(deleter); } else { taosMemoryFree(pManager); } + return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3981cedd3f..9316ba960e 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -63,7 +63,7 @@ typedef struct SDataDispatchHandle { // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header // clang-format on -static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { +static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { int32_t numOfCols = 0; SNode* pNode; @@ -88,6 +88,9 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn if (pHandle->pCompressBuf == NULL) { // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8); + if (NULL == pHandle->pCompressBuf) { + QRY_RET(terrno); + } pHandle->bufSize = pBuf->allocSize + 8; } else { if (pHandle->bufSize < pBuf->allocSize + 8) { @@ -96,9 +99,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn if (p != NULL) { pHandle->pCompressBuf = p; } else { - terrno = TSDB_CODE_OUT_OF_MEMORY; - qError("failed to prepare compress buf:%d, code: out of memory", pHandle->bufSize); - return; + qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno); + return terrno; } } } @@ -114,7 +116,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->compressed = 0; pEntry->dataLen = dataLen; pEntry->rawLen = dataLen; - memcpy(pEntry->data, pHandle->pCompressBuf, dataLen); + TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen); } } else { pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); @@ -124,11 +126,13 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pBuf->useSize += pEntry->dataLen; - atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); - atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); + (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + + return TSDB_CODE_SUCCESS; } -static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { +static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { /* uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) { @@ -142,69 +146,73 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { - qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); + qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno); + return terrno; } - return NULL != pBuf->pData; + return TSDB_CODE_SUCCESS; } static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { - taosThreadMutexLock(&pDispatcher->mutex); + (void)taosThreadMutexLock(&pDispatcher->mutex); int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks); int32_t status = (0 == blockNums ? DS_BUF_EMPTY : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; - taosThreadMutexUnlock(&pDispatcher->mutex); + (void)taosThreadMutexUnlock(&pDispatcher->mutex); return status; } static int32_t getStatus(SDataDispatchHandle* pDispatcher) { - taosThreadMutexLock(&pDispatcher->mutex); + (void)taosThreadMutexLock(&pDispatcher->mutex); int32_t status = pDispatcher->status; - taosThreadMutexUnlock(&pDispatcher->mutex); + (void)taosThreadMutexUnlock(&pDispatcher->mutex); return status; } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { int32_t code = 0; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf; + SDataDispatchBuf* pBuf = NULL; code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf); if (code) { return code; } - if (!allocBuf(pDispatcher, pInput, pBuf)) { + code = allocBuf(pDispatcher, pInput, pBuf); + if (code) { taosFreeQitem(pBuf); - return TSDB_CODE_OUT_OF_MEMORY; - } - - toDataCacheEntry(pDispatcher, pInput, pBuf); - code = taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - if (code != 0) { return code; } + QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf)); + QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf)); + int32_t status = updateStatus(pDispatcher); *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY); return TSDB_CODE_SUCCESS; + +_return: + + taosFreeQitem(pBuf); + return code; } static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - taosThreadMutexLock(&pDispatcher->mutex); + (void)taosThreadMutexLock(&pDispatcher->mutex); pDispatcher->queryEnd = true; pDispatcher->useconds = useconds; - taosThreadMutexUnlock(&pDispatcher->mutex); + (void)taosThreadMutexUnlock(&pDispatcher->mutex); } static void resetDispatcher(struct SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - taosThreadMutexLock(&pDispatcher->mutex); + (void)taosThreadMutexLock(&pDispatcher->mutex); pDispatcher->queryEnd = false; - taosThreadMutexUnlock(&pDispatcher->mutex); + (void)taosThreadMutexUnlock(&pDispatcher->mutex); } static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) { @@ -216,9 +224,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow } SDataDispatchBuf* pBuf = NULL; - taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + (void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { - memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); + TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); } @@ -243,33 +251,34 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); - memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); + TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; - atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + (void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); + (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent pOutput->bufStatus = updateStatus(pDispatcher); - taosThreadMutexLock(&pDispatcher->mutex); + + (void)taosThreadMutexLock(&pDispatcher->mutex); pOutput->queryEnd = pDispatcher->queryEnd; pOutput->useconds = pDispatcher->useconds; pOutput->precision = pDispatcher->pSchema->precision; - taosThreadMutexUnlock(&pDispatcher->mutex); + (void)taosThreadMutexUnlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); + (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); taosMemoryFreeClear(pDispatcher->nextOutput.pData); while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; - taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + (void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { taosMemoryFreeClear(pBuf->pData); taosFreeQitem(pBuf); @@ -280,7 +289,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosMemoryFreeClear(pDispatcher->pCompressBuf); pDispatcher->bufSize = 0; - taosThreadMutexDestroy(&pDispatcher->mutex); + (void)taosThreadMutexDestroy(&pDispatcher->mutex); taosMemoryFree(pDispatcher->pManager); return TSDB_CODE_SUCCESS; } @@ -297,7 +306,6 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _return; } @@ -318,7 +326,11 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD terrno = code; goto _return; } - taosThreadMutexInit(&dispatcher->mutex, NULL); + code = taosThreadMutexInit(&dispatcher->mutex, NULL); + if (code) { + terrno = code; + goto _return; + } if (NULL == dispatcher->pDataBlocks) { taosMemoryFree(dispatcher); @@ -330,6 +342,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD return TSDB_CODE_SUCCESS; _return: + taosMemoryFree(pManager); return terrno; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5ba2f8bf42..6f226ecb21 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -58,10 +58,17 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { SSubmitRspParam* pParam = (SSubmitRspParam*)param; SDataInserterHandle* pInserter = pParam->pInserter; - pInserter->submitRes.code = code; - + if (code) { + pInserter->submitRes.code = code; + } + if (code == TSDB_CODE_SUCCESS) { pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2)); + if (NULL == pInserter->submitRes.pRsp) { + pInserter->submitRes.code = terrno; + goto _return; + } + SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp); @@ -77,6 +84,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { for (int32_t i = 0; i < numOfTables; ++i) { SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i); + if (NULL == pRsp) { + pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _return; + } if (TSDB_CODE_SUCCESS != pRsp->code) { code = pRsp->code; taosMemoryFree(pInserter->submitRes.pRsp); @@ -94,8 +105,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { } _return: - tsem_post(&pInserter->ready); + + (void)tsem_post(&pInserter->ready); taosMemoryFree(pMsg->pData); + return TSDB_CODE_SUCCESS; } @@ -105,11 +118,15 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { taosMemoryFreeClear(pMsg); - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam)); + if (NULL == pParam) { + taosMemoryFreeClear(pMsg); + taosMemoryFreeClear(pMsgSendInfo); + return terrno; + } pParam->pInserter = pInserter; pMsgSendInfo->param = pParam; @@ -133,7 +150,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int len += sizeof(SSubmitReq2Msg); pBuf = taosMemoryMalloc(len); if (NULL == pBuf) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); @@ -149,6 +166,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int } else { taosMemoryFree(pBuf); } + return code; } @@ -162,12 +180,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp if (NULL == pReq) { if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; } if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; } } @@ -208,6 +224,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx); + if (NULL == pColInfoData) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _end; + } void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch (pColInfoData->info.type) { @@ -217,13 +237,17 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); + if (NULL == taosArrayPush(pVals, &cv)) { + goto _end; + } } else { void* data = colDataGetVarData(pColInfoData, j); SValue sv = (SValue){ .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + if (NULL == taosArrayPush(pVals, &cv)) { + goto _end; + } } break; } @@ -245,7 +269,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type - taosArrayPush(pVals, &cv); + if (NULL == taosArrayPush(pVals, &cv)) { + goto _end; + } } else { if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) { if (*(int64_t*)var <= lastTs) { @@ -256,9 +282,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } SValue sv = {.type = pCol->type}; - memcpy(&sv.val, var, tDataTypes[pCol->type].bytes); + TAOS_MEMCPY(&sv.val, var, tDataTypes[pCol->type].bytes); SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + if (NULL == taosArrayPush(pVals, &cv)) { + goto _end; + } } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); @@ -274,7 +302,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } - taosArrayPush(tbData.aRowP, &pRow); + if (NULL == taosArrayPush(tbData.aRowP, &pRow)) { + goto _end; + } } if (needSortMerge) { @@ -284,9 +314,12 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } } - taosArrayPush(pReq->aSubmitTbData, &tbData); + if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) { + goto _end; + } _end: + taosArrayDestroy(pVals); if (terrno != 0) { *ppReq = NULL; @@ -294,9 +327,11 @@ _end: tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } + return terrno; } *ppReq = pReq; + return TSDB_CODE_SUCCESS; } @@ -312,7 +347,9 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i); - + if (NULL == pDataBlock) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid); if (code) { if (pReq) { @@ -334,7 +371,9 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; if (!pInserter->explain) { - taosArrayPush(pInserter->pDataBlocks, &pInput->pData); + if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) { + return terrno; + } void* pMsg = NULL; int32_t msgLen = 0; int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen); @@ -350,7 +389,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, return code; } - tsem_wait(&pInserter->ready); + QRY_ERR_RET(tsem_wait(&pInserter->ready)); if (pInserter->submitRes.code) { return pInserter->submitRes.code; @@ -364,10 +403,10 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; - taosThreadMutexLock(&pInserter->mutex); + (void)taosThreadMutexLock(&pInserter->mutex); pInserter->queryEnd = true; pInserter->useconds = useconds; - taosThreadMutexUnlock(&pInserter->mutex); + (void)taosThreadMutexUnlock(&pInserter->mutex); } static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) { @@ -378,12 +417,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); + (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); taosArrayDestroy(pInserter->pDataBlocks); taosMemoryFree(pInserter->pSchema); taosMemoryFree(pInserter->pParam); taosHashCleanup(pInserter->pCols); - taosThreadMutexDestroy(&pInserter->mutex); + (void)taosThreadMutexDestroy(&pInserter->mutex); taosMemoryFree(pInserter->pManager); return TSDB_CODE_SUCCESS; @@ -401,7 +440,6 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); if (NULL == inserter) { taosMemoryFree(pParam); - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _return; } @@ -432,28 +470,31 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat } inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES); - taosThreadMutexInit(&inserter->mutex, NULL); if (NULL == inserter->pDataBlocks) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _return; } + QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL)); inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols; inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + if (NULL == inserter->pCols) { + goto _return; + } + SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pInserterNode->pCols) { SColumnNode* pCol = (SColumnNode*)pNode; - taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)); + QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId))); if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) { inserter->fullOrderColList = false; } ++i; } - tsem_init(&inserter->ready, 0, 0); + QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0)); *pHandle = inserter; return TSDB_CODE_SUCCESS; @@ -461,7 +502,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat _return: if (inserter) { - destroyDataSinker((SDataSinkHandle*)inserter); + (void)destroyDataSinker((SDataSinkHandle*)inserter); taosMemoryFree(inserter); } else { taosMemoryFree(pManager); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index e711ffdf5c..55fc520477 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -23,20 +23,20 @@ SDataSinkStat gDataSinkStat = {0}; int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) { SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager)); if (NULL == pSinkManager) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pSinkManager->cfg = *cfg; pSinkManager->pAPI = pAPI; *ppSinkManager = pSinkManager; - return 0; // to avoid compiler eror + return TSDB_CODE_SUCCESS; // to avoid compiler eror } int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { pStat->cachedSize = atomic_load_64(&gDataSinkStat.cachedSize); - return 0; + return TSDB_CODE_SUCCESS; } int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { @@ -56,6 +56,7 @@ int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, D taosMemoryFree(pSinkManager); qError("invalid input node type:%d, %s", nodeType(pDataSink), id); + return TSDB_CODE_QRY_INVALID_INPUT; } @@ -97,6 +98,6 @@ void dsScheduleProcess(void* ahandle, void* pItem) { void dsDestroyDataSinker(DataSinkHandle handle) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - pHandleImpl->fDestroy(pHandleImpl); + (void)pHandleImpl->fDestroy(pHandleImpl); taosMemoryFree(pHandleImpl); }