Merge pull request #20243 from taosdata/fix/main_bugfix_wxy
fix: last cache read problem after schema change
This commit is contained in:
commit
ed5a58fbb6
|
@ -766,6 +766,7 @@ typedef struct SCacheRowsReader {
|
||||||
TdThreadMutex readerMutex;
|
TdThreadMutex readerMutex;
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
|
STSchema *pCurrSchema;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
char **transferBuf; // todo remove it soon
|
char **transferBuf; // todo remove it soon
|
||||||
|
|
|
@ -1390,17 +1390,57 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
|
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
||||||
int32_t code = 0;
|
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
||||||
|
*ppDst = taosMemoryMalloc(len);
|
||||||
|
if (NULL == *ppDst) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
memcpy(*ppDst, pSrc, len);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
|
||||||
|
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
|
||||||
|
return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pReader->pCurrSchema);
|
||||||
|
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
|
||||||
|
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
|
||||||
|
if (NULL == pColArray) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
|
||||||
|
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
|
||||||
|
taosArrayPush(pColArray, &col);
|
||||||
|
}
|
||||||
|
*ppColArray = pColArray;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
|
||||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||||
int16_t nCol = pTSchema->numOfCols;
|
int16_t nLastCol = pTSchema->numOfCols;
|
||||||
int16_t iCol = 0;
|
|
||||||
int16_t noneCol = 0;
|
int16_t noneCol = 0;
|
||||||
bool setNoneCol = false;
|
bool setNoneCol = false;
|
||||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
|
bool hasRow = false;
|
||||||
|
SArray *pColArray = NULL;
|
||||||
SColVal *pColVal = &(SColVal){0};
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
|
||||||
|
int32_t code = initLastColArray(pTSchema, &pColArray);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
TSKEY lastRowTs = TSKEY_MAX;
|
TSKEY lastRowTs = TSKEY_MAX;
|
||||||
|
|
||||||
CacheNextRowIter iter = {0};
|
CacheNextRowIter iter = {0};
|
||||||
|
@ -1415,6 +1455,15 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hasRow = true;
|
||||||
|
|
||||||
|
code = updateTSchema(TSDBROW_SVERSION(pRow), pr, uid);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pTSchema = pr->pCurrSchema;
|
||||||
|
int16_t nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||||
|
|
||||||
if (lastRowTs == TSKEY_MAX) {
|
if (lastRowTs == TSKEY_MAX) {
|
||||||
|
@ -1422,28 +1471,27 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
||||||
STColumn *pTColumn = &pTSchema->columns[0];
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
||||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
for (int16_t iCol = 1; iCol < nCol; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||||
|
if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
|
|
||||||
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
|
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||||
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||||
if (lastCol.colVal.value.pData == NULL) {
|
if (pCol->colVal.value.pData == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(pColArray, &lastCol) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||||
|
@ -1461,10 +1509,16 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
||||||
|
|
||||||
// merge into pColArray
|
// merge into pColArray
|
||||||
setNoneCol = false;
|
setNoneCol = false;
|
||||||
for (iCol = noneCol; iCol < nCol; ++iCol) {
|
for (int16_t iCol = noneCol; iCol < nCol; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
// high version's column value
|
// high version's column value
|
||||||
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||||
SColVal *tColVal = &lastColVal->colVal;
|
if (lastColVal->colVal.cid != pTSchema->columns[iCol].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SColVal *tColVal = &lastColVal->colVal;
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
@ -1494,6 +1548,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
||||||
//*ppLastArray = NULL;
|
//*ppLastArray = NULL;
|
||||||
// taosArrayDestroy(pColArray);
|
// taosArrayDestroy(pColArray);
|
||||||
//} else {
|
//} else {
|
||||||
|
if (!hasRow) {
|
||||||
|
taosArrayClear(pColArray);
|
||||||
|
}
|
||||||
*ppLastArray = pColArray;
|
*ppLastArray = pColArray;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|
|
@ -209,6 +209,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
||||||
taosMemoryFree(p->pSchema);
|
taosMemoryFree(p->pSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(p->pCurrSchema);
|
||||||
|
|
||||||
destroyLastBlockLoadInfo(p->pLoadInfo);
|
destroyLastBlockLoadInfo(p->pLoadInfo);
|
||||||
|
|
||||||
taosMemoryFree((void*)p->idstr);
|
taosMemoryFree((void*)p->idstr);
|
||||||
|
@ -303,7 +305,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
|
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
|
||||||
struct STColumn* pCol = &pr->pSchema->columns[i];
|
struct STColumn* pCol = &pr->pSchema->columns[i];
|
||||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
|
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||||
|
|
Loading…
Reference in New Issue