Fix more bugs
This commit is contained in:
parent
6e48ca27d3
commit
1c82fe86bd
|
@ -1465,7 +1465,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
((!isCacheIterEnd) &&
|
((!isCacheIterEnd) &&
|
||||||
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >
|
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >
|
||||||
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
|
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
|
||||||
cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey))
|
cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
|
||||||
for (int col = 0; col < pObj->numOfColumns; col++) {
|
for (int col = 0; col < pObj->numOfColumns; col++) {
|
||||||
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
|
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
|
||||||
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
|
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
|
||||||
|
@ -1477,19 +1477,24 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
((payloadIter < rows) &&
|
((payloadIter < rows) &&
|
||||||
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
|
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
|
||||||
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
|
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
|
||||||
cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey)
|
cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
|
||||||
if (availPoints == 0) { // Need to allocate a new cache block
|
if (availPoints == 0) { // Need to allocate a new cache block
|
||||||
pthread_mutex_lock(&(pPool->vmutex));
|
pthread_mutex_lock(&(pPool->vmutex));
|
||||||
|
// TODO: Need to check if there are enough slots to hold a new one
|
||||||
SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
|
SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
|
||||||
if (pNewBlock == NULL) { // Failed to allocate a new cache block
|
if (pNewBlock == NULL) { // Failed to allocate a new cache block, need to commit and loop over the remaining cache records
|
||||||
pthread_mutex_unlock(&(pPool->vmutex));
|
pthread_mutex_unlock(&(pPool->vmutex));
|
||||||
payloadIter = rows;
|
payloadIter = rows;
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
pImport->commit = 1;
|
pImport->commit = 1;
|
||||||
// TODO: Fix here
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(pInfo->numOfBlocks <= pInfo->maxBlocks);
|
||||||
|
if (pInfo->numOfBlocks == pInfo->maxBlocks) {
|
||||||
|
vnodeFreeCacheBlock(pInfo->cacheBlocks[(pInfo->currentSlot + 1) % pInfo->maxBlocks]);
|
||||||
|
}
|
||||||
|
|
||||||
pNewBlock->pMeterObj = pObj;
|
pNewBlock->pMeterObj = pObj;
|
||||||
pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
|
pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
|
||||||
for (int col = 1; col < pObj->numOfColumns; col++)
|
for (int col = 1; col < pObj->numOfColumns; col++)
|
||||||
|
@ -1593,6 +1598,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pImport->importedRows += rowsImported;
|
pImport->importedRows += rowsImported;
|
||||||
|
__sync_fetch_and_sub(&(pObj->freePoints), rowsImported);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue