From 1c82fe86bdd5a9e09c5a6ad1c0234549d6fb6abf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 12 Nov 2019 15:51:49 +0800 Subject: [PATCH] Fix more bugs --- src/system/detail/src/vnodeImport.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index d774a55f38..21e30a5917 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -1465,7 +1465,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int ((!isCacheIterEnd) && (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) > KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), - cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)) + cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache for (int col = 0; col < pObj->numOfColumns; col++) { memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos, pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos, @@ -1477,18 +1477,23 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int ((payloadIter < rows) && (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), - cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey) + cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload if (availPoints == 0) { // Need to allocate a new cache block pthread_mutex_lock(&(pPool->vmutex)); + // TODO: Need to check if there are enough slots to hold a new one SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode); - if (pNewBlock == NULL) { // Failed to allocate a new cache block + if (pNewBlock == NULL) { // Failed to allocate a new cache block, need to commit and loop over the remaining cache records pthread_mutex_unlock(&(pPool->vmutex)); payloadIter = rows; code = TSDB_CODE_ACTION_IN_PROGRESS; pImport->commit = 1; - // TODO: Fix here continue; } + + assert(pInfo->numOfBlocks <= pInfo->maxBlocks); + if (pInfo->numOfBlocks == pInfo->maxBlocks) { + vnodeFreeCacheBlock(pInfo->cacheBlocks[(pInfo->currentSlot + 1) % pInfo->maxBlocks]); + } pNewBlock->pMeterObj = pObj; pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns; @@ -1593,6 +1598,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int } } pImport->importedRows += rowsImported; + __sync_fetch_and_sub(&(pObj->freePoints), rowsImported); code = 0;