refactor: do some refactor.

This commit is contained in:
Haojun Liao 2024-07-15 23:53:12 +08:00
parent 25f5c9dc5e
commit dd590f36e2
6 changed files with 161 additions and 98 deletions

View File

@ -136,7 +136,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
void tRowGetPrimaryKey(SRow *pRow, SRowKey *key); void tRowGetPrimaryKey(SRow *pRow, SRowKey *key);
int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2); int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2);
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
// SRowIter ================================ // SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);

View File

@ -305,11 +305,11 @@ static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo
*(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize; *(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize;
varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData); varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData);
if (colValArray[colValIndex].value.nData) { if (colValArray[colValIndex].value.nData) {
memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); (void)memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
varlen += colValArray[colValIndex].value.nData; varlen += colValArray[colValIndex].value.nData;
} }
} else { } else {
memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val, (void)memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val,
tDataTypes[schema->columns[i].type].bytes); tDataTypes[schema->columns[i].type].bytes);
} }
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL
@ -384,12 +384,12 @@ static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, c
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData); payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData);
if (colValArray[colValIndex].value.nData > 0) { if (colValArray[colValIndex].value.nData > 0) {
memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); (void)memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
} }
payloadSize += colValArray[colValIndex].value.nData; payloadSize += colValArray[colValIndex].value.nData;
} else { } else {
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
memcpy(payload + payloadSize, &colValArray[colValIndex].value.val, (void)memcpy(payload + payloadSize, &colValArray[colValIndex].value.val,
tDataTypes[schema->columns[i].type].bytes); tDataTypes[schema->columns[i].type].bytes);
payloadSize += tDataTypes[schema->columns[i].type].bytes; payloadSize += tDataTypes[schema->columns[i].type].bytes;
} }
@ -475,7 +475,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
value.nData = infos[iInfo].bind->length[iRow]; value.nData = infos[iInfo].bind->length[iRow];
value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow; value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow;
} else { } else {
memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow, (void)memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow,
infos[iInfo].bind->buffer_length); infos[iInfo].bind->buffer_length);
} }
colVal = COL_VAL_VALUE(infos[iInfo].columnId, value); colVal = COL_VAL_VALUE(infos[iInfo].columnId, value);
@ -509,7 +509,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
pColVal->cid = pTColumn->colId; pColVal->cid = pTColumn->colId;
pColVal->value.type = pTColumn->type; pColVal->value.type = pTColumn->type;
pColVal->flag = CV_FLAG_VALUE; pColVal->flag = CV_FLAG_VALUE;
memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY)); (void)memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY));
return 0; return 0;
} }
@ -573,7 +573,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
} }
} else { } else {
memcpy(&pColVal->value.val, pData, pTColumn->bytes); (void)memcpy(&pColVal->value.val, pData, pTColumn->bytes);
} }
} }
return 0; return 0;
@ -624,7 +624,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset); pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset);
pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData); pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData);
} else { } else {
memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]); (void)memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
} }
} }
@ -861,7 +861,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
pIter->cv.cid = pTColumn->colId; pIter->cv.cid = pTColumn->colId;
pIter->cv.value.type = pTColumn->type; pIter->cv.value.type = pTColumn->type;
pIter->cv.flag = CV_FLAG_VALUE; pIter->cv.flag = CV_FLAG_VALUE;
memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY)); (void)memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY));
goto _exit; goto _exit;
} }
@ -906,7 +906,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
pIter->cv.value.pData = NULL; pIter->cv.value.pData = NULL;
} }
} else { } else {
memcpy(&pIter->cv.value.val, pData, pTColumn->bytes); (void)memcpy(&pIter->cv.value.val, pData, pTColumn->bytes);
} }
} }
@ -965,7 +965,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
pIter->cv.value.pData = NULL; pIter->cv.value.pData = NULL;
} }
} else { } else {
memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]); (void)memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
} }
goto _exit; goto _exit;
} }
@ -1288,7 +1288,7 @@ void tRowGetPrimaryKey(SRow *row, SRowKey *key) {
key->pks[i].pData = tdata; key->pks[i].pData = tdata;
key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData); key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData);
} else { } else {
memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes); (void)memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes);
} }
} }
} }
@ -1378,7 +1378,7 @@ FORCE_INLINE int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2) {
return 0; return 0;
} }
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
pDst->ts = pSrc->ts; pDst->ts = pSrc->ts;
pDst->numOfPKs = pSrc->numOfPKs; pDst->numOfPKs = pSrc->numOfPKs;
@ -1392,12 +1392,10 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
} else { } else {
pVal->nData = pSrc->pks[i].nData; pVal->nData = pSrc->pks[i].nData;
ASSERT(pSrc->pks[i].pData != NULL); ASSERT(pSrc->pks[i].pData != NULL);
memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); (void)memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData);
} }
} }
} }
return TSDB_CODE_SUCCESS;
} }
// STag ======================================== // STag ========================================
@ -1528,7 +1526,7 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
} else { } else {
p = p ? p + n : p; p = p ? p + n : p;
n += tDataTypes[pTagVal->type].bytes; n += tDataTypes[pTagVal->type].bytes;
if (p) memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes); if (p) (void)memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes);
} }
return n; return n;
@ -1550,7 +1548,7 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
if (IS_VAR_DATA_TYPE(pTagVal->type)) { if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData); n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
} else { } else {
memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes); (void)memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes);
n += tDataTypes[pTagVal->type].bytes; n += tDataTypes[pTagVal->type].bytes;
} }
@ -1661,7 +1659,7 @@ char *tTagValToData(const STagVal *value, bool isJson) {
} }
varDataLen(data + typeBytes) = value->nData; varDataLen(data + typeBytes) = value->nData;
memcpy(varDataVal(data + typeBytes), value->pData, value->nData); (void)memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
} else { } else {
data = ((char *)&(value->i64)) - typeBytes; // json with type data = ((char *)&(value->i64)) - typeBytes; // json with type
} }
@ -1713,7 +1711,7 @@ bool tTagGet(const STag *pTag, STagVal *pTagVal) {
} else if (c > 0) { } else if (c > 0) {
lidx = midx + 1; lidx = midx + 1;
} else { } else {
memcpy(pTagVal, &tv, sizeof(tv)); (void)memcpy(pTagVal, &tv, sizeof(tv));
return true; return true;
} }
} }
@ -1892,7 +1890,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData,
if (nData) { if (nData) {
code = tRealloc(&pColData->pData, pColData->nData + nData); code = tRealloc(&pColData->pData, pColData->nData + nData);
if (code) goto _exit; if (code) goto _exit;
memcpy(pColData->pData + pColData->nData, pData, nData); (void)memcpy(pColData->pData + pColData->nData, pData, nData);
pColData->nData += nData; pColData->nData += nData;
} }
} else { } else {
@ -1900,7 +1898,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData,
code = tRealloc(&pColData->pData, pColData->nData + tDataTypes[pColData->type].bytes); code = tRealloc(&pColData->pData, pColData->nData + tDataTypes[pColData->type].bytes);
if (code) goto _exit; if (code) goto _exit;
if (pData) { if (pData) {
memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]); (void)memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]);
} else { } else {
memset(pColData->pData + pColData->nData, 0, TYPE_BYTES[pColData->type]); memset(pColData->pData + pColData->nData, 0, TYPE_BYTES[pColData->type]);
} }
@ -2594,7 +2592,7 @@ static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SCo
} }
value.pData = pColData->pData + pColData->aOffset[iVal]; value.pData = pColData->pData + pColData->aOffset[iVal];
} else { } else {
memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes); (void)memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes);
} }
*pColVal = COL_VAL_VALUE(pColData->cid, value); *pColVal = COL_VAL_VALUE(pColData->cid, value);
} }
@ -2692,7 +2690,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal)); (void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal));
break; break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NULL | HAS_NONE):
pColData->pBitMap = xMalloc(arg, BIT2_SIZE(pColData->nVal)); pColData->pBitMap = xMalloc(arg, BIT2_SIZE(pColData->nVal));
@ -2700,7 +2698,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal)); (void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal));
break; break;
default: default:
pColData->pBitMap = NULL; pColData->pBitMap = NULL;
@ -2714,7 +2712,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2); (void)memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2);
} else { } else {
pColData->aOffset = NULL; pColData->aOffset = NULL;
} }
@ -2727,7 +2725,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
goto _exit; goto _exit;
} }
memcpy(pColData->pData, pColDataFrom->pData, pColData->nData); (void)memcpy(pColData->pData, pColDataFrom->pData, pColData->nData);
} else { } else {
pColData->pData = NULL; pColData->pData = NULL;
} }
@ -3119,10 +3117,10 @@ static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SCo
pToColData->aOffset[iToRow + 1] = pToColData->aOffset[iToRow] + nData; pToColData->aOffset[iToRow + 1] = pToColData->aOffset[iToRow] + nData;
} }
memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow], (void)memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow],
nData); nData);
} else { } else {
memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow], (void)memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow],
&pFromColData->pData[TYPE_BYTES[pToColData->type] * iFromRow], TYPE_BYTES[pToColData->type]); &pFromColData->pData[TYPE_BYTES[pToColData->type] * iFromRow], TYPE_BYTES[pToColData->type]);
} }
return code; return code;
@ -3361,7 +3359,7 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /
ASSERT(0); ASSERT(0);
} else { } else {
if (iv != iStart) { if (iv != iStart) {
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart], (void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
&pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]); &pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]);
} }
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)], memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)],
@ -3579,11 +3577,11 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): case (HAS_VALUE | HAS_NULL):
if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal)); if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
n += BIT1_SIZE(pColData->nVal); n += BIT1_SIZE(pColData->nVal);
break; break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NULL | HAS_NONE):
if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal)); if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
n += BIT2_SIZE(pColData->nVal); n += BIT2_SIZE(pColData->nVal);
break; break;
default: default:
@ -3593,14 +3591,14 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
// value // value
if (pColData->flag & HAS_VALUE) { if (pColData->flag & HAS_VALUE) {
if (IS_VAR_DATA_TYPE(pColData->type)) { if (IS_VAR_DATA_TYPE(pColData->type)) {
if (pBuf) memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2); if (pBuf) (void)memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2);
n += (pColData->nVal << 2); n += (pColData->nVal << 2);
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData); n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData);
if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData); if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
n += pColData->nData; n += pColData->nData;
} else { } else {
if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData); if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
n += pColData->nData; n += pColData->nData;
} }
} }
@ -4348,7 +4346,7 @@ int32_t tCompressData(void *input, // input
ASSERT(outputSize >= extraSizeNeeded); ASSERT(outputSize >= extraSizeNeeded);
if (info->cmprAlg == NO_COMPRESSION) { if (info->cmprAlg == NO_COMPRESSION) {
memcpy(output, input, info->originalSize); (void)memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize; info->compressedSize = info->originalSize;
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
SBuffer local; SBuffer local;
@ -4385,7 +4383,7 @@ int32_t tCompressData(void *input, // input
} else { } else {
DEFINE_VAR(info->cmprAlg) DEFINE_VAR(info->cmprAlg)
if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) {
memcpy(output, input, info->originalSize); (void)memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize; info->compressedSize = info->originalSize;
return 0; return 0;
} }
@ -4431,7 +4429,7 @@ int32_t tDecompressData(void *input, // input
if (info->cmprAlg == NO_COMPRESSION) { if (info->cmprAlg == NO_COMPRESSION) {
ASSERT(info->compressedSize == info->originalSize); ASSERT(info->compressedSize == info->originalSize);
memcpy(output, input, info->compressedSize); (void)memcpy(output, input, info->compressedSize);
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
SBuffer local; SBuffer local;
@ -4468,7 +4466,7 @@ int32_t tDecompressData(void *input, // input
} else { } else {
DEFINE_VAR(info->cmprAlg); DEFINE_VAR(info->cmprAlg);
if (l1 == L1_DISABLED && l2 == L2_DISABLED) { if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
memcpy(output, input, info->compressedSize); (void)memcpy(output, input, info->compressedSize);
return 0; return 0;
} }
SBuffer local; SBuffer local;

View File

@ -2579,7 +2579,7 @@ typedef struct {
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) { tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
int32_t code = 0; int32_t code = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, NULL); destroySttBlockReader(pr->pLDataIterArray, NULL);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
SMergeTreeConf conf = { SMergeTreeConf conf = {

View File

@ -203,7 +203,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
pReader->pTableList = pTableIdList; pReader->pTableList = pTableIdList;
pReader->numOfTables = numOfTables; pReader->numOfTables = numOfTables;
pReader->lastTs = INT64_MIN; pReader->lastTs = INT64_MIN;
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, NULL); destroySttBlockReader(pReader->pLDataIterArray, NULL);
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -45,7 +45,7 @@ typedef struct {
bool moreThanCapcity; bool moreThanCapcity;
} SDataBlockToLoadInfo; } SDataBlockToLoadInfo;
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader); STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
@ -123,7 +123,7 @@ static void tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_
pKey->pks[0].val = cv.value.val; pKey->pks[0].val = cv.value.val;
} else { } else {
pKey->pks[0].nData = cv.value.nData; pKey->pks[0].nData = cv.value.nData;
/*void* p = */memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData); (void)(void)memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData);
} }
} }
@ -155,9 +155,9 @@ static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
if (IS_VAR_DATA_TYPE(indices[i].type)) { if (IS_VAR_DATA_TYPE(indices[i].type)) {
tdata += tGetU32v(tdata, &pKey->pks[i].nData); tdata += tGetU32v(tdata, &pKey->pks[i].nData);
memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); (void)memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
} else { } else {
memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); (void)memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
} }
} }
} }
@ -304,7 +304,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
while (1) { while (1) {
if (pReader->pFileReader != NULL) { if (pReader->pFileReader != NULL) {
tsdbDataFileReaderClose(&pReader->pFileReader); (void) tsdbDataFileReaderClose(&pReader->pFileReader);
} }
pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index]; pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index];
@ -377,15 +377,20 @@ _err:
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); } bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); }
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = 0; pIter->numOfBlocks = 0;
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
if (pIter->blockList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else { } else {
clearDataBlockIterator(pIter, needFree); clearDataBlockIterator(pIter, needFree);
} }
return TSDB_CODE_SUCCESS;
} }
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
@ -818,7 +823,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa
} else { } else {
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData);
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); (void)memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
} }
} }
@ -836,7 +841,7 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
} }
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); (void)memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
} }
colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
@ -848,19 +853,21 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo) {
*pInfo = NULL;
if (pBlockIter->blockList == NULL) { if (pBlockIter->blockList == NULL) {
return NULL; return TSDB_CODE_SUCCESS;
} }
size_t num = TARRAY_SIZE(pBlockIter->blockList); size_t num = TARRAY_SIZE(pBlockIter->blockList);
if (num == 0) { if (num == 0) {
ASSERT(pBlockIter->numOfBlocks == num); ASSERT(pBlockIter->numOfBlocks == num);
return NULL; return TSDB_CODE_SUCCESS;
} }
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); *pInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
return pBlockInfo; return (*pInfo) != NULL? TSDB_CODE_SUCCESS:TSDB_CODE_FAILED;
} }
static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) { static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) {
@ -974,10 +981,10 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
int32_t dumpedRows, bool asc) { int32_t dumpedRows, bool asc) {
if (asc) { if (asc) {
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t)); (void)memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
} else { } else {
int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t)); (void)memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));
// todo: opt perf by extract the loop // todo: opt perf by extract the loop
// reverse the array list // reverse the array list
@ -1008,7 +1015,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model // 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); (void)memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
// 2. reverse the array list in case of descending order scan data block // 2. reverse the array list in case of descending order scan data block
if (!asc) { if (!asc) {
@ -1123,7 +1130,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pStatus->fileBlockData; SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = NULL;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t numOfOutputCols = pSupInfo->numOfCols;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1131,6 +1138,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
code = getCurrentBlockInfo(pBlockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SColVal cv = {0}; SColVal cv = {0};
SBrinRecord tmp; SBrinRecord tmp;
blockInfoToRecord(&tmp, pBlockInfo, pSupInfo); blockInfoToRecord(&tmp, pBlockInfo, pSupInfo);
@ -1314,9 +1326,11 @@ static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) { uint64_t uid) {
int32_t code = 0; int32_t code = 0;
STSchema* pSchema = pReader->info.pSchema; STSchema* pSchema = pReader->info.pSchema;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SFileDataBlockInfo* pBlockInfo = NULL;
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
@ -1328,8 +1342,11 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
} }
} }
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; code = getCurrentBlockInfo(pBlockIter, &pBlockInfo);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (code != TSDB_CODE_SUCCESS) {
return code;
}
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord tmp; SBrinRecord tmp;
@ -2524,11 +2541,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
double el = 0; double el = 0;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = NULL;
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (pBlockInfo == NULL) { code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return 0; return 0;
} }
@ -2572,7 +2590,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0) {
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); // NOTE: get the new block info
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
// continue check for the next file block if the last ts in the current block // continue check for the next file block if the last ts in the current block
// is overlapped with the next neighbor block // is overlapped with the next neighbor block
@ -2797,6 +2818,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return loadDataFileTombDataForAll(pReader); return loadDataFileTombDataForAll(pReader);
} }
// pTableIter can be NULL, no need to handle the return value
static void resetTableListIndex(SReaderStatus* pStatus) { static void resetTableListIndex(SReaderStatus* pStatus) {
STableUidList* pList = &pStatus->uidList; STableUidList* pList = &pStatus->uidList;
@ -2838,7 +2860,7 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) {
return (pStatus->pProcMemTableIter != NULL); return (pStatus->pProcMemTableIter != NULL);
} }
static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) { static int32_t buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
@ -2846,7 +2868,10 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SDataBlockInfo* pInfo = &pResBlock->info; SDataBlockInfo* pInfo = &pResBlock->info;
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); int32_t code = blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->rows = pScanInfo->numOfRowsInStt; pInfo->rows = pScanInfo->numOfRowsInStt;
pInfo->id.uid = pScanInfo->uid; pInfo->id.uid = pScanInfo->uid;
@ -2891,8 +2916,8 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[0].val = pBlockInfo->firstPk.val;
pInfo->pks[1].val = pBlockInfo->lastPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val;
} else { } else {
memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); (void)memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData));
memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); (void)memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData));
pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData);
pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData);
@ -2963,8 +2988,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
// if only require the total rows, no need to load data from stt file if it is clean stt blocks // if only require the total rows, no need to load data from stt file if it is clean stt blocks
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) { if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
buildCleanBlockFromSttFiles(pReader, pScanInfo); code = buildCleanBlockFromSttFiles(pReader, pScanInfo);
return TSDB_CODE_SUCCESS; return code;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -3019,11 +3044,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
STableBlockScanInfo* pScanInfo = NULL; STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = NULL;
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
code = getCurrentBlockInfo(pBlockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
return code; return code;
@ -3080,8 +3110,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// return the stt file block // return the stt file block
ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL); ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL);
buildCleanBlockFromSttFiles(pReader, pScanInfo); code = buildCleanBlockFromSttFiles(pReader, pScanInfo);
return TSDB_CODE_SUCCESS; return code;
} }
} else { } else {
SBlockData* pBData = &pReader->status.fileBlockData; SBlockData* pBData = &pReader->status.fileBlockData;
@ -3218,11 +3248,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
// set the correct start position in case of the first/last file block, according to the query time window // set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = NULL;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
if (pBlockInfo) { int32_t code = getCurrentBlockInfo(pBlockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->totalRows = pBlockInfo->numRow;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1;
} else { } else {
@ -3255,7 +3286,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists } else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
} }
@ -3365,7 +3396,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromSttFiles(pReader); ERetrieveType type = doReadDataFromSttFiles(pReader);
@ -3742,7 +3776,12 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
while (1) { while (1) {
CHECK_FILEBLOCK_STATE st; CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFileBlockInfo = NULL;
int32_t code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pFileBlockInfo == NULL) { if (pFileBlockInfo == NULL) {
st = CHECK_FILEBLOCK_QUIT; st = CHECK_FILEBLOCK_QUIT;
break; break;
@ -4224,6 +4263,7 @@ uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.v
static int32_t doOpenReaderImpl(STsdbReader* pReader) { static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
int32_t code = TSDB_CODE_SUCCESS;
if (pReader->bFilesetDelimited) { if (pReader->bFilesetDelimited) {
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
@ -4231,9 +4271,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false; pStatus->loadFromFile = false;
} else { } else {
@ -4278,13 +4320,17 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) { SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
int32_t code = 0;
int32_t capacity = pConf->tsdbCfg.maxRows; int32_t capacity = pConf->tsdbCfg.maxRows;
if (pResBlock != NULL) { if (pResBlock != NULL) {
blockDataEnsureCapacity(pResBlock, capacity); code = blockDataEnsureCapacity(pResBlock, capacity);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
} }
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
@ -4498,7 +4544,7 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) {
tsdbDataFileReaderClose(&pCurrentReader->pFileReader); tsdbDataFileReaderClose(&pCurrentReader->pFileReader);
SReadCostSummary* pCost = &pCurrentReader->cost; SReadCostSummary* pCost = &pCurrentReader->cost;
pStatus->pLDataIterArray = destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost); destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost);
pStatus->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pStatus->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
} }
@ -4922,9 +4968,9 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} }
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) {
SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg; SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg;
SFileDataBlockInfo* pBlockInfo = NULL;
int32_t code = 0; int32_t code = 0;
*allHave = false; *allHave = false;
*pBlockSMA = NULL; *pBlockSMA = NULL;
@ -4937,7 +4983,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
@ -5020,7 +5070,12 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = NULL;
code = getCurrentBlockInfo(&pStatus->blockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
@ -5115,7 +5170,12 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
if (code != TSDB_CODE_SUCCESS) {
tsdbReleaseReader(pReader);
return code;
}
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -5218,8 +5278,13 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
while (true) { while (true) {
if (hasNext) { if (hasNext) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = NULL;
int32_t numOfRows = pBlockInfo->numRow; code = getCurrentBlockInfo(pBlockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
break;
}
int32_t numOfRows = pBlockInfo->numRow;
pTableBlockInfo->totalRows += numOfRows; pTableBlockInfo->totalRows += numOfRows;

View File

@ -358,10 +358,10 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2);
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
void clearRowKey(SRowKey* pKey); void clearRowKey(SRowKey* pKey);
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp);
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
typedef struct { typedef struct {
SArray* pTombData; SArray* pTombData;