fix issue
This commit is contained in:
parent
9d384bc438
commit
1b285f8a1d
|
@ -991,6 +991,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||||
|
qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
|
||||||
if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
||||||
|
@ -1007,6 +1008,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
||||||
taosMemoryFreeClear(pVal);
|
taosMemoryFreeClear(pVal);
|
||||||
pNewPos->beFlushed = true;
|
pNewPos->beFlushed = true;
|
||||||
|
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
|
||||||
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
|
@ -1077,6 +1079,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
int32_t vlen = 0;
|
int32_t vlen = 0;
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||||
|
qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
|
||||||
if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
|
if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
||||||
|
@ -1085,9 +1088,17 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (vlen != pFileState->rowSize) {
|
||||||
|
qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
|
||||||
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
taosMemoryFreeClear(pVal);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
||||||
taosMemoryFreeClear(pVal);
|
taosMemoryFreeClear(pVal);
|
||||||
pNewPos->beFlushed = true;
|
pNewPos->beFlushed = true;
|
||||||
|
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
|
||||||
winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||||
if (winRes != TSDB_CODE_SUCCESS) {
|
if (winRes != TSDB_CODE_SUCCESS) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
|
|
Loading…
Reference in New Issue