fix delete
This commit is contained in:
parent
e2586979eb
commit
367552b914
|
@ -832,7 +832,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
tDecoderClear(pCoder);
|
tDecoderClear(pCoder);
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pRes->uidList);
|
int32_t sz = taosArrayGetSize(pRes->uidList);
|
||||||
if (sz == 0) {
|
if (sz == 0 || pRes->affectedRows == 0) {
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,25 +79,33 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
|
||||||
pEntry->dataLen = sizeof(SDeleterRes);
|
pEntry->dataLen = sizeof(SDeleterRes);
|
||||||
|
|
||||||
ASSERT(1 == pEntry->numOfRows);
|
ASSERT(1 == pEntry->numOfRows);
|
||||||
ASSERT(1 == pEntry->numOfCols);
|
ASSERT(3 == pEntry->numOfCols);
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
|
|
||||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
||||||
|
SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
|
||||||
|
SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
|
||||||
|
|
||||||
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
|
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
|
||||||
pRes->suid = pHandle->pParam->suid;
|
pRes->suid = pHandle->pParam->suid;
|
||||||
pRes->uidList = pHandle->pParam->pUidList;
|
pRes->uidList = pHandle->pParam->pUidList;
|
||||||
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
|
|
||||||
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
|
|
||||||
strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
|
strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
|
||||||
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
|
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
|
||||||
pRes->affectedRows = *(int64_t*)pColRes->pData;
|
pRes->affectedRows = *(int64_t*)pColRes->pData;
|
||||||
|
if (pRes->affectedRows) {
|
||||||
|
pRes->skey = *(int64_t*)pColSKey->pData;
|
||||||
|
pRes->ekey = *(int64_t*)pColEKey->pData;
|
||||||
|
ASSERT(pRes->skey <= pRes->ekey);
|
||||||
|
} else {
|
||||||
|
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
|
||||||
|
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
|
||||||
|
}
|
||||||
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
|
|
||||||
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||||
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
|
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
|
||||||
|
@ -172,7 +180,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
||||||
*pLen = pEntry->dataLen;
|
*pLen = pEntry->dataLen;
|
||||||
*pQueryEnd = pDeleter->queryEnd;
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
||||||
|
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
|
@ -186,14 +195,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
||||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||||
pDeleter->pParam->pUidList = NULL;
|
pDeleter->pParam->pUidList = NULL;
|
||||||
pOutput->numOfRows = pEntry->numOfRows;
|
pOutput->numOfRows = pEntry->numOfRows;
|
||||||
pOutput->numOfCols = pEntry->numOfCols;
|
pOutput->numOfCols = pEntry->numOfCols;
|
||||||
pOutput->compressed = pEntry->compressed;
|
pOutput->compressed = pEntry->compressed;
|
||||||
|
|
||||||
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
|
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
|
||||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||||
|
|
||||||
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
|
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
|
||||||
pOutput->bufStatus = updateStatus(pDeleter);
|
pOutput->bufStatus = updateStatus(pDeleter);
|
||||||
|
@ -202,7 +211,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
pOutput->useconds = pDeleter->useconds;
|
pOutput->useconds = pDeleter->useconds;
|
||||||
pOutput->precision = pDeleter->pSchema->precision;
|
pOutput->precision = pDeleter->pSchema->precision;
|
||||||
taosThreadMutexUnlock(&pDeleter->mutex);
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +220,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
|
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
|
||||||
taosMemoryFreeClear(pDeleter->nextOutput.pData);
|
taosMemoryFreeClear(pDeleter->nextOutput.pData);
|
||||||
taosArrayDestroy(pDeleter->pParam->pUidList);
|
taosArrayDestroy(pDeleter->pParam->pUidList);
|
||||||
taosMemoryFree(pDeleter->pParam);
|
taosMemoryFree(pDeleter->pParam);
|
||||||
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||||
SDataDeleterBuf* pBuf = NULL;
|
SDataDeleterBuf* pBuf = NULL;
|
||||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
|
@ -230,14 +239,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
|
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
|
||||||
|
void* pParam) {
|
||||||
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
||||||
if (NULL == deleter) {
|
if (NULL == deleter) {
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink;
|
SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
|
||||||
deleter->sink.fPut = putDataBlock;
|
deleter->sink.fPut = putDataBlock;
|
||||||
deleter->sink.fEndPut = endPut;
|
deleter->sink.fEndPut = endPut;
|
||||||
deleter->sink.fGetLen = getDataLength;
|
deleter->sink.fGetLen = getDataLength;
|
||||||
|
|
Loading…
Reference in New Issue