fix: sink function return code processing

This commit is contained in:
dapan1121 2024-07-22 18:42:43 +08:00
parent ef96d37c1f
commit 14383a32a3
4 changed files with 184 additions and 101 deletions

View File

@ -53,7 +53,7 @@ typedef struct SDataDeleterHandle {
TdThreadMutex mutex;
} SDataDeleterHandle;
static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
@ -65,14 +65,23 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
if (NULL == pColRes) {
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
if (NULL == pColSKey) {
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
if (NULL == pColEKey) {
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList;
strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName);
TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName);
pRes->affectedRows = *(int64_t*)pColRes->pData;
if (pRes->affectedRows) {
@ -88,16 +97,18 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
(void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
(void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
return TSDB_CODE_SUCCESS;
}
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDeleter->pDataBlocks));
return false;
return TSDB_CODE_OUT_OF_MEMORY;
}
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
@ -105,55 +116,66 @@ static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDa
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
return terrno;
}
return NULL != pBuf->pData;
return TSDB_CODE_SUCCESS;
}
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
(void)taosThreadMutexLock(&pDeleter->mutex);
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDeleter->status = status;
taosThreadMutexUnlock(&pDeleter->mutex);
(void)taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
(void)taosThreadMutexLock(&pDeleter->mutex);
int32_t status = pDeleter->status;
taosThreadMutexUnlock(&pDeleter->mutex);
(void)taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
SDataDeleterBuf* pBuf;
SDataDeleterBuf* pBuf = NULL;
int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf);
if (code) {
return code;
}
if (!allocBuf(pDeleter, pInput, pBuf)) {
code = allocBuf(pDeleter, pInput, pBuf);
if (code) {
taosFreeQitem(pBuf);
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
toDataCacheEntry(pDeleter, pInput, pBuf);
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf));
QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf));
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
return TSDB_CODE_SUCCESS;
_return:
taosFreeQitem(pBuf);
return code;
}
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
taosThreadMutexLock(&pDeleter->mutex);
(void)taosThreadMutexLock(&pDeleter->mutex);
pDeleter->queryEnd = true;
pDeleter->useconds = useconds;
taosThreadMutexUnlock(&pDeleter->mutex);
(void)taosThreadMutexUnlock(&pDeleter->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
@ -165,9 +187,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw
}
SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
if (pBuf != NULL) {
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
taosFreeQitem(pBuf);
}
@ -192,35 +214,35 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
pDeleter->pParam->pUidList = NULL;
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
(void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDeleter);
taosThreadMutexLock(&pDeleter->mutex);
(void)taosThreadMutexLock(&pDeleter->mutex);
pOutput->queryEnd = pDeleter->queryEnd;
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
taosThreadMutexUnlock(&pDeleter->mutex);
(void)taosThreadMutexUnlock(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData);
taosArrayDestroy(pDeleter->pParam->pUidList);
taosMemoryFree(pDeleter->pParam);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
if (pBuf != NULL) {
taosMemoryFreeClear(pBuf->pData);
@ -228,9 +250,10 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
(void)taosThreadMutexDestroy(&pDeleter->mutex);
taosMemoryFree(pDeleter->pManager);
return TSDB_CODE_SUCCESS;
}
@ -247,8 +270,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
if (NULL == deleter) {
code = terrno;
taosMemoryFree(pParam);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
@ -276,17 +299,22 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
if (code) {
goto _end;
}
taosThreadMutexInit(&deleter->mutex, NULL);
code = taosThreadMutexInit(&deleter->mutex, NULL);
if (code) {
goto _end;
}
*pHandle = deleter;
return code;
_end:
if (deleter != NULL) {
destroyDataSinker((SDataSinkHandle*)deleter);
(void)destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter);
} else {
taosMemoryFree(pManager);
}
return code;
}

View File

@ -63,7 +63,7 @@ typedef struct SDataDispatchHandle {
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
// clang-format on
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0;
SNode* pNode;
@ -88,6 +88,9 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
if (NULL == pHandle->pCompressBuf) {
QRY_RET(terrno);
}
pHandle->bufSize = pBuf->allocSize + 8;
} else {
if (pHandle->bufSize < pBuf->allocSize + 8) {
@ -96,9 +99,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
if (p != NULL) {
pHandle->pCompressBuf = p;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to prepare compress buf:%d, code: out of memory", pHandle->bufSize);
return;
qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno);
return terrno;
}
}
}
@ -114,7 +116,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->compressed = 0;
pEntry->dataLen = dataLen;
pEntry->rawLen = dataLen;
memcpy(pEntry->data, pHandle->pCompressBuf, dataLen);
TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
}
} else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
@ -124,11 +126,13 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
(void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
(void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
return TSDB_CODE_SUCCESS;
}
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
/*
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
@ -142,69 +146,73 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno);
return terrno;
}
return NULL != pBuf->pData;
return TSDB_CODE_SUCCESS;
}
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
taosThreadMutexLock(&pDispatcher->mutex);
(void)taosThreadMutexLock(&pDispatcher->mutex);
int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status;
taosThreadMutexUnlock(&pDispatcher->mutex);
(void)taosThreadMutexUnlock(&pDispatcher->mutex);
return status;
}
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
taosThreadMutexLock(&pDispatcher->mutex);
(void)taosThreadMutexLock(&pDispatcher->mutex);
int32_t status = pDispatcher->status;
taosThreadMutexUnlock(&pDispatcher->mutex);
(void)taosThreadMutexUnlock(&pDispatcher->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
int32_t code = 0;
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf;
SDataDispatchBuf* pBuf = NULL;
code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf);
if (code) {
return code;
}
if (!allocBuf(pDispatcher, pInput, pBuf)) {
code = allocBuf(pDispatcher, pInput, pBuf);
if (code) {
taosFreeQitem(pBuf);
return TSDB_CODE_OUT_OF_MEMORY;
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
code = taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
if (code != 0) {
return code;
}
QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf));
QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf));
int32_t status = updateStatus(pDispatcher);
*pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY);
return TSDB_CODE_SUCCESS;
_return:
taosFreeQitem(pBuf);
return code;
}
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
taosThreadMutexLock(&pDispatcher->mutex);
(void)taosThreadMutexLock(&pDispatcher->mutex);
pDispatcher->queryEnd = true;
pDispatcher->useconds = useconds;
taosThreadMutexUnlock(&pDispatcher->mutex);
(void)taosThreadMutexUnlock(&pDispatcher->mutex);
}
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
taosThreadMutexLock(&pDispatcher->mutex);
(void)taosThreadMutexLock(&pDispatcher->mutex);
pDispatcher->queryEnd = false;
taosThreadMutexUnlock(&pDispatcher->mutex);
(void)taosThreadMutexUnlock(&pDispatcher->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
@ -216,9 +224,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
if (pBuf != NULL) {
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
}
@ -243,33 +251,34 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
(void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDispatcher);
taosThreadMutexLock(&pDispatcher->mutex);
(void)taosThreadMutexLock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->pSchema->precision;
taosThreadMutexUnlock(&pDispatcher->mutex);
(void)taosThreadMutexUnlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
taosMemoryFreeClear(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
if (pBuf != NULL) {
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
@ -280,7 +289,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosMemoryFreeClear(pDispatcher->pCompressBuf);
pDispatcher->bufSize = 0;
taosThreadMutexDestroy(&pDispatcher->mutex);
(void)taosThreadMutexDestroy(&pDispatcher->mutex);
taosMemoryFree(pDispatcher->pManager);
return TSDB_CODE_SUCCESS;
}
@ -297,7 +306,6 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
@ -318,7 +326,11 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
terrno = code;
goto _return;
}
taosThreadMutexInit(&dispatcher->mutex, NULL);
code = taosThreadMutexInit(&dispatcher->mutex, NULL);
if (code) {
terrno = code;
goto _return;
}
if (NULL == dispatcher->pDataBlocks) {
taosMemoryFree(dispatcher);
@ -330,6 +342,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(pManager);
return terrno;
}

View File

@ -58,10 +58,17 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
SSubmitRspParam* pParam = (SSubmitRspParam*)param;
SDataInserterHandle* pInserter = pParam->pInserter;
pInserter->submitRes.code = code;
if (code) {
pInserter->submitRes.code = code;
}
if (code == TSDB_CODE_SUCCESS) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
if (NULL == pInserter->submitRes.pRsp) {
pInserter->submitRes.code = terrno;
goto _return;
}
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
@ -77,6 +84,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
for (int32_t i = 0; i < numOfTables; ++i) {
SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
if (NULL == pRsp) {
pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _return;
}
if (TSDB_CODE_SUCCESS != pRsp->code) {
code = pRsp->code;
taosMemoryFree(pInserter->submitRes.pRsp);
@ -94,8 +105,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
_return:
tsem_post(&pInserter->ready);
(void)tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
return TSDB_CODE_SUCCESS;
}
@ -105,11 +118,15 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
if (NULL == pParam) {
taosMemoryFreeClear(pMsg);
taosMemoryFreeClear(pMsgSendInfo);
return terrno;
}
pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam;
@ -133,7 +150,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
len += sizeof(SSubmitReq2Msg);
pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
@ -149,6 +166,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
} else {
taosMemoryFree(pBuf);
}
return code;
}
@ -162,12 +180,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
@ -208,6 +224,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
if (NULL == pColInfoData) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
@ -217,13 +237,17 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
if (NULL == taosArrayPush(pVals, &cv)) {
goto _end;
}
} else {
void* data = colDataGetVarData(pColInfoData, j);
SValue sv = (SValue){
.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
taosArrayPush(pVals, &cv);
if (NULL == taosArrayPush(pVals, &cv)) {
goto _end;
}
}
break;
}
@ -245,7 +269,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
if (NULL == taosArrayPush(pVals, &cv)) {
goto _end;
}
} else {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
if (*(int64_t*)var <= lastTs) {
@ -256,9 +282,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
SValue sv = {.type = pCol->type};
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
TAOS_MEMCPY(&sv.val, var, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
taosArrayPush(pVals, &cv);
if (NULL == taosArrayPush(pVals, &cv)) {
goto _end;
}
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
@ -274,7 +302,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
taosArrayPush(tbData.aRowP, &pRow);
if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
goto _end;
}
}
if (needSortMerge) {
@ -284,9 +314,12 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
}
taosArrayPush(pReq->aSubmitTbData, &tbData);
if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
goto _end;
}
_end:
taosArrayDestroy(pVals);
if (terrno != 0) {
*ppReq = NULL;
@ -294,9 +327,11 @@ _end:
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return terrno;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
}
@ -312,7 +347,9 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
if (code) {
if (pReq) {
@ -334,7 +371,9 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
if (!pInserter->explain) {
taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
return terrno;
}
void* pMsg = NULL;
int32_t msgLen = 0;
int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
@ -350,7 +389,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
return code;
}
tsem_wait(&pInserter->ready);
QRY_ERR_RET(tsem_wait(&pInserter->ready));
if (pInserter->submitRes.code) {
return pInserter->submitRes.code;
@ -364,10 +403,10 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosThreadMutexLock(&pInserter->mutex);
(void)taosThreadMutexLock(&pInserter->mutex);
pInserter->queryEnd = true;
pInserter->useconds = useconds;
taosThreadMutexUnlock(&pInserter->mutex);
(void)taosThreadMutexUnlock(&pInserter->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
@ -378,12 +417,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks);
taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex);
(void)taosThreadMutexDestroy(&pInserter->mutex);
taosMemoryFree(pInserter->pManager);
return TSDB_CODE_SUCCESS;
@ -401,7 +440,6 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
if (NULL == inserter) {
taosMemoryFree(pParam);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
@ -432,28 +470,31 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
}
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
false, HASH_NO_LOCK);
if (NULL == inserter->pCols) {
goto _return;
}
SNode* pNode = NULL;
int32_t i = 0;
FOREACH(pNode, pInserterNode->pCols) {
SColumnNode* pCol = (SColumnNode*)pNode;
taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
inserter->fullOrderColList = false;
}
++i;
}
tsem_init(&inserter->ready, 0, 0);
QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
*pHandle = inserter;
return TSDB_CODE_SUCCESS;
@ -461,7 +502,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
_return:
if (inserter) {
destroyDataSinker((SDataSinkHandle*)inserter);
(void)destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
} else {
taosMemoryFree(pManager);

View File

@ -23,20 +23,20 @@ SDataSinkStat gDataSinkStat = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
if (NULL == pSinkManager) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pSinkManager->cfg = *cfg;
pSinkManager->pAPI = pAPI;
*ppSinkManager = pSinkManager;
return 0; // to avoid compiler eror
return TSDB_CODE_SUCCESS; // to avoid compiler eror
}
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
pStat->cachedSize = atomic_load_64(&gDataSinkStat.cachedSize);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
@ -56,6 +56,7 @@ int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, D
taosMemoryFree(pSinkManager);
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -97,6 +98,6 @@ void dsScheduleProcess(void* ahandle, void* pItem) {
void dsDestroyDataSinker(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fDestroy(pHandleImpl);
(void)pHandleImpl->fDestroy(pHandleImpl);
taosMemoryFree(pHandleImpl);
}