fix file state issue
This commit is contained in:
parent
797be89187
commit
9c4d34ede1
|
@ -655,18 +655,9 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
|
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (pPos->pRowBuff) {
|
|
||||||
if (pPos->needFree) {
|
|
||||||
code = recoverSessionRowBuff(pFileState, pPos);
|
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
|
||||||
}
|
|
||||||
(*pVal) = pPos->pRowBuff;
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
pPos->pRowBuff = getFreeBuff(pFileState);
|
pPos->pRowBuff = getFreeBuff(pFileState);
|
||||||
if (!pPos->pRowBuff) {
|
if (!pPos->pRowBuff) {
|
||||||
if (pFileState->curRowCount < pFileState->maxRowCount) {
|
if (pFileState->curRowCount < pFileState->maxRowCount) {
|
||||||
|
@ -687,6 +678,27 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
|
||||||
code = recoverSessionRowBuff(pFileState, pPos);
|
code = recoverSessionRowBuff(pFileState, pPos);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
if (pPos->pRowBuff) {
|
||||||
|
if (pPos->needFree) {
|
||||||
|
code = recoverSessionRowBuff(pFileState, pPos);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
(*pVal) = pPos->pRowBuff;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
recoverStateRowBuff(pFileState, pPos);
|
||||||
|
|
||||||
(*pVal) = pPos->pRowBuff;
|
(*pVal) = pPos->pRowBuff;
|
||||||
if (!pPos->needFree) {
|
if (!pPos->needFree) {
|
||||||
code = tdListPrepend(pFileState->usedBuffs, &pPos);
|
code = tdListPrepend(pFileState->usedBuffs, &pPos);
|
||||||
|
@ -1026,13 +1038,16 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
(*pWinCode) = TSDB_CODE_FAILED;
|
(*pWinCode) = TSDB_CODE_FAILED;
|
||||||
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
|
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
|
||||||
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
|
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
|
||||||
if (pos) {
|
if (ppPos) {
|
||||||
*pVLen = pFileState->rowSize;
|
*pVLen = pFileState->rowSize;
|
||||||
*pVal = *pos;
|
*pVal = *ppPos;
|
||||||
(*pos)->beUsed = true;
|
(*ppPos)->beUsed = true;
|
||||||
(*pos)->beFlushed = false;
|
(*ppPos)->beFlushed = false;
|
||||||
(*pWinCode) = TSDB_CODE_SUCCESS;
|
(*pWinCode) = TSDB_CODE_SUCCESS;
|
||||||
|
if ((*ppPos)->pRowBuff == NULL) {
|
||||||
|
recoverStateRowBuff(pFileState, *ppPos);
|
||||||
|
}
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
TSKEY ts = pFileState->getTs(pKey);
|
TSKEY ts = pFileState->getTs(pKey);
|
||||||
|
|
Loading…
Reference in New Issue