From 367552b9146d8d8053baea29d44d24a6480bc940 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 15 Sep 2022 10:08:48 +0800 Subject: [PATCH] fix delete --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/src/dataDeleter.c | 38 ++++++++++++++++---------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f9c2757c37..c8841e5e16 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -832,7 +832,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { tDecoderClear(pCoder); int32_t sz = taosArrayGetSize(pRes->uidList); - if (sz == 0) { + if (sz == 0 || pRes->affectedRows == 0) { taosArrayDestroy(pRes->uidList); return 0; } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 40198615ea..55a1a1fdb9 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -79,25 +79,33 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pEntry->dataLen = sizeof(SDeleterRes); ASSERT(1 == pEntry->numOfRows); - ASSERT(1 == pEntry->numOfCols); + ASSERT(3 == pEntry->numOfCols); pBuf->useSize = sizeof(SDataCacheEntry); SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); + SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1); + SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2); SDeleterRes* pRes = (SDeleterRes*)pEntry->data; pRes->suid = pHandle->pParam->suid; pRes->uidList = pHandle->pParam->pUidList; - pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; - pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; strcpy(pRes->tableName, pHandle->pDeleter->tableFName); strcpy(pRes->tsColName, pHandle->pDeleter->tsColName); pRes->affectedRows = *(int64_t*)pColRes->pData; + if (pRes->affectedRows) { + pRes->skey = *(int64_t*)pColSKey->pData; + pRes->ekey = *(int64_t*)pColEKey->pData; + ASSERT(pRes->skey <= pRes->ekey); + } else { + pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; + pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; + } pBuf->useSize += pEntry->dataLen; - - atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); - atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + + atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); + atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); } static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) { @@ -172,7 +180,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData; *pLen = pEntry->dataLen; *pQueryEnd = pDeleter->queryEnd; - qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); + qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, + ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { @@ -186,14 +195,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { return TSDB_CODE_SUCCESS; } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); - memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); + memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pDeleter->pParam->pUidList = NULL; pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; - atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent pOutput->bufStatus = updateStatus(pDeleter); @@ -202,7 +211,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->useconds = pDeleter->useconds; pOutput->precision = pDeleter->pSchema->precision; taosThreadMutexUnlock(&pDeleter->mutex); - + return TSDB_CODE_SUCCESS; } @@ -211,7 +220,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); taosMemoryFreeClear(pDeleter->nextOutput.pData); taosArrayDestroy(pDeleter->pParam->pUidList); - taosMemoryFree(pDeleter->pParam); + taosMemoryFree(pDeleter->pParam); while (!taosQueueEmpty(pDeleter->pDataBlocks)) { SDataDeleterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); @@ -230,14 +239,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { return TSDB_CODE_SUCCESS; } -int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) { +int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, + void* pParam) { SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY; } - SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; + SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink; deleter->sink.fPut = putDataBlock; deleter->sink.fEndPut = endPut; deleter->sink.fGetLen = getDataLength;