Merge pull request #26592 from taosdata/fix/syntax
fix(tsdb): check return value.
This commit is contained in:
commit
103e9194b0
|
@ -136,7 +136,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
|
|||
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
|
||||
void tRowGetPrimaryKey(SRow *pRow, SRowKey *key);
|
||||
int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2);
|
||||
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
|
||||
void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
|
||||
|
||||
// SRowIter ================================
|
||||
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
|
||||
|
|
|
@ -17,13 +17,13 @@
|
|||
#define _TD_COMMON_MSG_CB_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SRpcMsg SRpcMsg;
|
||||
typedef struct SEpSet SEpSet;
|
||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||
typedef struct SRpcHandleInfo SRpcHandleInfo;
|
||||
|
||||
|
@ -46,7 +46,7 @@ typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
|
|||
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
|
||||
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
typedef void (*SendRspFp)(SRpcMsg* pMsg);
|
||||
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
|
||||
typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg);
|
||||
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
|
||||
typedef void (*ReportStartup)(const char* name, const char* desc);
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ typedef struct SMTbCursor {
|
|||
} SMTbCursor;
|
||||
|
||||
typedef struct SMCtbCursor {
|
||||
SMeta* pMeta;
|
||||
struct SMeta* pMeta;
|
||||
void* pCur;
|
||||
tb_uid_t suid;
|
||||
void* pKey;
|
||||
|
@ -134,7 +134,7 @@ typedef struct SMetaTableInfo {
|
|||
} SMetaTableInfo;
|
||||
|
||||
typedef struct SSnapContext {
|
||||
SMeta* pMeta;
|
||||
struct SMeta* pMeta;
|
||||
int64_t snapVersion;
|
||||
void* pCur;
|
||||
int64_t suid;
|
||||
|
@ -178,7 +178,7 @@ typedef struct TsdReader {
|
|||
int32_t (*tsdNextDataBlock)();
|
||||
|
||||
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
|
||||
SSDataBlock *(*tsdReaderRetrieveDataBlock)();
|
||||
int32_t (*tsdReaderRetrieveDataBlock)();
|
||||
|
||||
void (*tsdReaderReleaseDataBlock)();
|
||||
|
||||
|
|
|
@ -70,7 +70,6 @@ typedef int64_t SyncIndex;
|
|||
typedef int64_t SyncTerm;
|
||||
|
||||
typedef struct SSyncNode SSyncNode;
|
||||
typedef struct SWal SWal;
|
||||
typedef struct SSyncRaftEntry SSyncRaftEntry;
|
||||
|
||||
typedef enum {
|
||||
|
@ -238,7 +237,7 @@ typedef struct SSyncInfo {
|
|||
int32_t batchSize;
|
||||
SSyncCfg syncCfg;
|
||||
char path[TSDB_FILENAME_LEN];
|
||||
SWal* pWal;
|
||||
struct SWal* pWal;
|
||||
SSyncFSM* pFsm;
|
||||
SMsgCb* msgcb;
|
||||
int32_t pingMs;
|
||||
|
|
|
@ -615,6 +615,10 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
|
|||
int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
|
||||
if (pColInfoData == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -1514,7 +1518,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
|
|||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
pBlock->pDataBlock = NULL;
|
||||
|
||||
|
||||
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
|
||||
}
|
||||
|
|
|
@ -305,11 +305,11 @@ static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo
|
|||
*(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize;
|
||||
varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData);
|
||||
if (colValArray[colValIndex].value.nData) {
|
||||
memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
|
||||
(void)memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
|
||||
varlen += colValArray[colValIndex].value.nData;
|
||||
}
|
||||
} else {
|
||||
memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val,
|
||||
(void)memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val,
|
||||
tDataTypes[schema->columns[i].type].bytes);
|
||||
}
|
||||
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL
|
||||
|
@ -384,12 +384,12 @@ static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, c
|
|||
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
|
||||
payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData);
|
||||
if (colValArray[colValIndex].value.nData > 0) {
|
||||
memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
|
||||
(void)memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
|
||||
}
|
||||
payloadSize += colValArray[colValIndex].value.nData;
|
||||
} else {
|
||||
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
|
||||
memcpy(payload + payloadSize, &colValArray[colValIndex].value.val,
|
||||
(void)memcpy(payload + payloadSize, &colValArray[colValIndex].value.val,
|
||||
tDataTypes[schema->columns[i].type].bytes);
|
||||
payloadSize += tDataTypes[schema->columns[i].type].bytes;
|
||||
}
|
||||
|
@ -475,7 +475,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
|
|||
value.nData = infos[iInfo].bind->length[iRow];
|
||||
value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow;
|
||||
} else {
|
||||
memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow,
|
||||
(void)memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow,
|
||||
infos[iInfo].bind->buffer_length);
|
||||
}
|
||||
colVal = COL_VAL_VALUE(infos[iInfo].columnId, value);
|
||||
|
@ -509,7 +509,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
|
|||
pColVal->cid = pTColumn->colId;
|
||||
pColVal->value.type = pTColumn->type;
|
||||
pColVal->flag = CV_FLAG_VALUE;
|
||||
memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY));
|
||||
(void)memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -573,7 +573,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
|
|||
pColVal->value.pData = NULL;
|
||||
}
|
||||
} else {
|
||||
memcpy(&pColVal->value.val, pData, pTColumn->bytes);
|
||||
(void)memcpy(&pColVal->value.val, pData, pTColumn->bytes);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -624,7 +624,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
|
|||
pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset);
|
||||
pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData);
|
||||
} else {
|
||||
memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
|
||||
(void)memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -861,7 +861,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
pIter->cv.cid = pTColumn->colId;
|
||||
pIter->cv.value.type = pTColumn->type;
|
||||
pIter->cv.flag = CV_FLAG_VALUE;
|
||||
memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY));
|
||||
(void)memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -906,7 +906,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
pIter->cv.value.pData = NULL;
|
||||
}
|
||||
} else {
|
||||
memcpy(&pIter->cv.value.val, pData, pTColumn->bytes);
|
||||
(void)memcpy(&pIter->cv.value.val, pData, pTColumn->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -965,7 +965,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
pIter->cv.value.pData = NULL;
|
||||
}
|
||||
} else {
|
||||
memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
|
||||
(void)memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
|
@ -1288,7 +1288,7 @@ void tRowGetPrimaryKey(SRow *row, SRowKey *key) {
|
|||
key->pks[i].pData = tdata;
|
||||
key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData);
|
||||
} else {
|
||||
memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes);
|
||||
(void)memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1378,7 +1378,7 @@ FORCE_INLINE int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
|
||||
void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
|
||||
pDst->ts = pSrc->ts;
|
||||
pDst->numOfPKs = pSrc->numOfPKs;
|
||||
|
||||
|
@ -1392,12 +1392,10 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
|
|||
} else {
|
||||
pVal->nData = pSrc->pks[i].nData;
|
||||
ASSERT(pSrc->pks[i].pData != NULL);
|
||||
memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData);
|
||||
(void)memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// STag ========================================
|
||||
|
@ -1528,7 +1526,7 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
|||
} else {
|
||||
p = p ? p + n : p;
|
||||
n += tDataTypes[pTagVal->type].bytes;
|
||||
if (p) memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes);
|
||||
if (p) (void)memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes);
|
||||
}
|
||||
|
||||
return n;
|
||||
|
@ -1550,7 +1548,7 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
|||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
|
||||
} else {
|
||||
memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes);
|
||||
(void)memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes);
|
||||
n += tDataTypes[pTagVal->type].bytes;
|
||||
}
|
||||
|
||||
|
@ -1661,7 +1659,7 @@ char *tTagValToData(const STagVal *value, bool isJson) {
|
|||
}
|
||||
|
||||
varDataLen(data + typeBytes) = value->nData;
|
||||
memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
|
||||
(void)memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
|
||||
} else {
|
||||
data = ((char *)&(value->i64)) - typeBytes; // json with type
|
||||
}
|
||||
|
@ -1713,7 +1711,7 @@ bool tTagGet(const STag *pTag, STagVal *pTagVal) {
|
|||
} else if (c > 0) {
|
||||
lidx = midx + 1;
|
||||
} else {
|
||||
memcpy(pTagVal, &tv, sizeof(tv));
|
||||
(void)memcpy(pTagVal, &tv, sizeof(tv));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1892,7 +1890,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData,
|
|||
if (nData) {
|
||||
code = tRealloc(&pColData->pData, pColData->nData + nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColData->pData + pColData->nData, pData, nData);
|
||||
(void)memcpy(pColData->pData + pColData->nData, pData, nData);
|
||||
pColData->nData += nData;
|
||||
}
|
||||
} else {
|
||||
|
@ -1900,7 +1898,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData,
|
|||
code = tRealloc(&pColData->pData, pColData->nData + tDataTypes[pColData->type].bytes);
|
||||
if (code) goto _exit;
|
||||
if (pData) {
|
||||
memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]);
|
||||
(void)memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]);
|
||||
} else {
|
||||
memset(pColData->pData + pColData->nData, 0, TYPE_BYTES[pColData->type]);
|
||||
}
|
||||
|
@ -2594,7 +2592,7 @@ static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SCo
|
|||
}
|
||||
value.pData = pColData->pData + pColData->aOffset[iVal];
|
||||
} else {
|
||||
memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes);
|
||||
(void)memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes);
|
||||
}
|
||||
*pColVal = COL_VAL_VALUE(pColData->cid, value);
|
||||
}
|
||||
|
@ -2692,7 +2690,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal));
|
||||
(void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal));
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL | HAS_NONE):
|
||||
pColData->pBitMap = xMalloc(arg, BIT2_SIZE(pColData->nVal));
|
||||
|
@ -2700,7 +2698,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal));
|
||||
(void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal));
|
||||
break;
|
||||
default:
|
||||
pColData->pBitMap = NULL;
|
||||
|
@ -2714,7 +2712,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2);
|
||||
(void)memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2);
|
||||
} else {
|
||||
pColData->aOffset = NULL;
|
||||
}
|
||||
|
@ -2727,7 +2725,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
memcpy(pColData->pData, pColDataFrom->pData, pColData->nData);
|
||||
(void)memcpy(pColData->pData, pColDataFrom->pData, pColData->nData);
|
||||
} else {
|
||||
pColData->pData = NULL;
|
||||
}
|
||||
|
@ -3119,10 +3117,10 @@ static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SCo
|
|||
pToColData->aOffset[iToRow + 1] = pToColData->aOffset[iToRow] + nData;
|
||||
}
|
||||
|
||||
memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow],
|
||||
(void)memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow],
|
||||
nData);
|
||||
} else {
|
||||
memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow],
|
||||
(void)memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow],
|
||||
&pFromColData->pData[TYPE_BYTES[pToColData->type] * iFromRow], TYPE_BYTES[pToColData->type]);
|
||||
}
|
||||
return code;
|
||||
|
@ -3361,7 +3359,7 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /
|
|||
ASSERT(0);
|
||||
} else {
|
||||
if (iv != iStart) {
|
||||
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
|
||||
(void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
|
||||
&pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]);
|
||||
}
|
||||
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)],
|
||||
|
@ -3579,11 +3577,11 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
|
|||
case (HAS_NULL | HAS_NONE):
|
||||
case (HAS_VALUE | HAS_NONE):
|
||||
case (HAS_VALUE | HAS_NULL):
|
||||
if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
|
||||
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
|
||||
n += BIT1_SIZE(pColData->nVal);
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL | HAS_NONE):
|
||||
if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
|
||||
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
|
||||
n += BIT2_SIZE(pColData->nVal);
|
||||
break;
|
||||
default:
|
||||
|
@ -3593,14 +3591,14 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
|
|||
// value
|
||||
if (pColData->flag & HAS_VALUE) {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
if (pBuf) memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2);
|
||||
if (pBuf) (void)memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2);
|
||||
n += (pColData->nVal << 2);
|
||||
|
||||
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData);
|
||||
if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData);
|
||||
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
|
||||
n += pColData->nData;
|
||||
} else {
|
||||
if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData);
|
||||
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
|
||||
n += pColData->nData;
|
||||
}
|
||||
}
|
||||
|
@ -4348,7 +4346,7 @@ int32_t tCompressData(void *input, // input
|
|||
ASSERT(outputSize >= extraSizeNeeded);
|
||||
|
||||
if (info->cmprAlg == NO_COMPRESSION) {
|
||||
memcpy(output, input, info->originalSize);
|
||||
(void)memcpy(output, input, info->originalSize);
|
||||
info->compressedSize = info->originalSize;
|
||||
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
|
||||
SBuffer local;
|
||||
|
@ -4385,7 +4383,7 @@ int32_t tCompressData(void *input, // input
|
|||
} else {
|
||||
DEFINE_VAR(info->cmprAlg)
|
||||
if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) {
|
||||
memcpy(output, input, info->originalSize);
|
||||
(void)memcpy(output, input, info->originalSize);
|
||||
info->compressedSize = info->originalSize;
|
||||
return 0;
|
||||
}
|
||||
|
@ -4431,7 +4429,7 @@ int32_t tDecompressData(void *input, // input
|
|||
|
||||
if (info->cmprAlg == NO_COMPRESSION) {
|
||||
ASSERT(info->compressedSize == info->originalSize);
|
||||
memcpy(output, input, info->compressedSize);
|
||||
(void)memcpy(output, input, info->compressedSize);
|
||||
} else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) {
|
||||
SBuffer local;
|
||||
|
||||
|
@ -4468,7 +4466,7 @@ int32_t tDecompressData(void *input, // input
|
|||
} else {
|
||||
DEFINE_VAR(info->cmprAlg);
|
||||
if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
|
||||
memcpy(output, input, info->compressedSize);
|
||||
(void)memcpy(output, input, info->compressedSize);
|
||||
return 0;
|
||||
}
|
||||
SBuffer local;
|
||||
|
|
|
@ -167,7 +167,7 @@ void tsdbReaderClose2(STsdbReader *pReader);
|
|||
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
|
||||
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
|
||||
void tsdbReleaseDataBlock2(STsdbReader *pReader);
|
||||
SSDataBlock *tsdbRetrieveDataBlock2(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList);
|
||||
int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);
|
||||
|
|
|
@ -909,9 +909,9 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
|||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
|
||||
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||
void * destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void * destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
||||
int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo);
|
||||
void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
||||
|
||||
// tsdbCache ==============================================================================================
|
||||
typedef enum {
|
||||
|
|
|
@ -54,7 +54,6 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SVnodeInfo SVnodeInfo;
|
||||
typedef struct SMeta SMeta;
|
||||
typedef struct SSma SSma;
|
||||
typedef struct STsdb STsdb;
|
||||
typedef struct STQ STQ;
|
||||
|
@ -153,7 +152,6 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
|
|||
void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive);
|
||||
|
||||
// meta
|
||||
typedef struct SMCtbCursor SMCtbCursor;
|
||||
typedef struct SMStbCursor SMStbCursor;
|
||||
typedef struct STbUidStore STbUidStore;
|
||||
|
||||
|
|
|
@ -2579,7 +2579,7 @@ typedef struct {
|
|||
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
|
||||
int32_t code = 0;
|
||||
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, NULL);
|
||||
destroySttBlockReader(pr->pLDataIterArray, NULL);
|
||||
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
SMergeTreeConf conf = {
|
||||
|
@ -3212,7 +3212,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
pIter->pMemDelData = NULL;
|
||||
|
||||
loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
|
||||
code = loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
|
||||
|
||||
|
|
|
@ -203,7 +203,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
|
|||
pReader->pTableList = pTableIdList;
|
||||
pReader->numOfTables = numOfTables;
|
||||
pReader->lastTs = INT64_MIN;
|
||||
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, NULL);
|
||||
destroySttBlockReader(pReader->pLDataIterArray, NULL);
|
||||
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -23,11 +23,12 @@
|
|||
static void tLDataIterClose2(SLDataIter *pIter);
|
||||
|
||||
// SLDataIter =================================================
|
||||
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
|
||||
int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo) {
|
||||
*pInfo = NULL;
|
||||
|
||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
|
||||
if (pLoadInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pLoadInfo->blockData[0].sttBlockIndex = -1;
|
||||
|
@ -37,26 +38,29 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
|
|||
|
||||
int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
taosMemoryFreeClear(pLoadInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tBlockDataCreate(&pLoadInfo->blockData[1].data);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
taosMemoryFreeClear(pLoadInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||
if (pLoadInfo->aSttBlk == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(pLoadInfo);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
pLoadInfo->pSchema = pSchema;
|
||||
pLoadInfo->colIds = colList;
|
||||
pLoadInfo->numOfCols = numOfCols;
|
||||
|
||||
return pLoadInfo;
|
||||
*pInfo = pLoadInfo;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void freeItem(void* pValue) {
|
||||
|
@ -66,9 +70,9 @@ static void freeItem(void* pValue) {
|
|||
}
|
||||
}
|
||||
|
||||
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
if (pLoadInfo == NULL) {
|
||||
return NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
pLoadInfo->currentLoadBlockIndex = 1;
|
||||
|
@ -94,7 +98,6 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
|||
|
||||
taosArrayDestroy(pLoadInfo->aSttBlk);
|
||||
taosMemoryFree(pLoadInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void destroyLDataIter(SLDataIter *pIter) {
|
||||
|
@ -103,9 +106,9 @@ void destroyLDataIter(SLDataIter *pIter) {
|
|||
taosMemoryFree(pIter);
|
||||
}
|
||||
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
|
||||
void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
|
||||
if (pLDataIterArray == NULL) {
|
||||
return NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
|
||||
|
@ -132,7 +135,6 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoa
|
|||
}
|
||||
|
||||
taosArrayDestroy(pLDataIterArray);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// choose the unpinned slot to load next data block
|
||||
|
@ -914,9 +916,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
}
|
||||
|
||||
if (pLoadInfo == NULL) {
|
||||
pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
|
||||
if (pLoadInfo == NULL) {
|
||||
code = terrno;
|
||||
code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pLoadInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -112,22 +112,28 @@ void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
|
|||
taosArrayDestroy(pBuf->pData);
|
||||
}
|
||||
|
||||
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
|
||||
int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pInfo) {
|
||||
*pInfo = NULL;
|
||||
|
||||
int32_t bucketIndex = index / pBuf->numPerBucket;
|
||||
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
|
||||
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
|
||||
}
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
|
||||
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
|
||||
if (p == NULL || *p == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
int32_t size = tSimpleHashGetSize(pTableMap);
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
|
||||
return NULL;
|
||||
if (pBucket == NULL) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return *p;
|
||||
*pInfo = (STableBlockScanInfo*)((*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id) {
|
||||
*pInfo = *(STableBlockScanInfo**)tSimpleHashGet(pTableMap, &uid, sizeof(uid));
|
||||
if (pInfo == NULL) {
|
||||
int32_t size = tSimpleHashGetSize(pTableMap);
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
|
||||
|
@ -146,17 +152,17 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
|
|||
}
|
||||
case TSDB_DATA_TYPE_INT:{
|
||||
int32_t min = INT32_MIN;
|
||||
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT:{
|
||||
int16_t min = INT16_MIN;
|
||||
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TINYINT:{
|
||||
int8_t min = INT8_MIN;
|
||||
memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
|
@ -245,20 +251,27 @@ int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSH
|
|||
initLastProcKey(pScanInfo, pReader);
|
||||
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
int32_t code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pReader, pScanInfo->uid,
|
||||
pScanInfo->lastProcKey.ts, pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
|
||||
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables) {
|
||||
int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj) {
|
||||
int32_t code = 0;
|
||||
*pHashObj = NULL;
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
// todo use simple hash instead, optimize the memory consumption
|
||||
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (pTableMap == NULL) {
|
||||
return NULL;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -267,7 +280,7 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||
if (pUidList->tableUidList == NULL) {
|
||||
tSimpleHashCleanup(pTableMap);
|
||||
return NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pUidList->currentIndex = 0;
|
||||
|
@ -275,8 +288,16 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||
pUidList->tableUidList[j] = idList[j].uid;
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
|
||||
initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader);
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
code = getPosInBlockInfoBuf(pBuf, j, &pScanInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
@ -286,7 +307,8 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
|
||||
pTsdbReader->idStr);
|
||||
|
||||
return pTableMap;
|
||||
*pHashObj = pTableMap;
|
||||
return code;
|
||||
}
|
||||
|
||||
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
|
||||
|
@ -391,11 +413,13 @@ void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray
|
|||
pIter->pBrinBlockList = pList;
|
||||
}
|
||||
|
||||
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
|
||||
int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
|
||||
*pRecord = NULL;
|
||||
|
||||
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) {
|
||||
pIter->blockIndex += 1;
|
||||
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
|
||||
return NULL;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
|
||||
|
@ -404,7 +428,7 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
|
|||
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
|
||||
return NULL;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pIter->recordIndex = -1;
|
||||
|
@ -412,7 +436,9 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
|
|||
|
||||
pIter->recordIndex += 1;
|
||||
tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
|
||||
return &pIter->record;
|
||||
*pRecord = &pIter->record;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter) { tBrinBlockDestroy(&pIter->block); }
|
||||
|
@ -670,7 +696,13 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
|
|||
STombRecord record = {0};
|
||||
|
||||
uint64_t uid = pReader->status.uidList.tableUidList[*j];
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
|
||||
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pScanInfo->pFileDelData == NULL) {
|
||||
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
@ -702,7 +734,11 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
|
|||
}
|
||||
|
||||
uid = pReader->status.uidList.tableUidList[*j];
|
||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pScanInfo->pFileDelData == NULL) {
|
||||
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
@ -806,9 +842,12 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
|
|||
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false);
|
||||
}
|
||||
|
||||
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
|
||||
int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
|
||||
if (*ppMemDelData == NULL) {
|
||||
*ppMemDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
if (*ppMemDelData == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
SArray* pMemDelData = *ppMemDelData;
|
||||
|
@ -836,6 +875,8 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
|
|||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
|
@ -960,6 +1001,7 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet)
|
|||
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
|
||||
const char* pstr) {
|
||||
int32_t numOfRows = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
// no data exists, go to end
|
||||
int32_t numOfLevels = pFileSet->lvlArr->size;
|
||||
|
@ -984,7 +1026,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
|
||||
|
||||
const char* pName = pSttLevel->fobjArr->data[i]->fname;
|
||||
int32_t code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader);
|
||||
code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr);
|
||||
continue;
|
||||
|
@ -992,8 +1034,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
}
|
||||
|
||||
if (pIter->pBlockLoadInfo == NULL) {
|
||||
pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
|
||||
if (pIter->pBlockLoadInfo == NULL) {
|
||||
code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pIter->pBlockLoadInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to create block load info, code: out of memory, %s", pstr);
|
||||
continue;
|
||||
}
|
||||
|
@ -1001,7 +1043,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
|
||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
TStatisBlkArray* pStatisBlkArray = NULL;
|
||||
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray);
|
||||
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
|
||||
continue;
|
||||
|
|
|
@ -315,32 +315,31 @@ typedef struct SBrinRecordIter {
|
|||
} SBrinRecordIter;
|
||||
|
||||
int32_t uidComparFunc(const void* p1, const void* p2);
|
||||
int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id);
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
|
||||
|
||||
SSHashObj* createDataBlockScanInfo(STsdbReader* pReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables);
|
||||
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
|
||||
STsdbReader* pReader);
|
||||
void clearBlockScanInfo(STableBlockScanInfo* p);
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap);
|
||||
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step);
|
||||
void cleanupInfoForNextFileset(SSHashObj* pTableMap);
|
||||
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables);
|
||||
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf);
|
||||
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index);
|
||||
int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj);
|
||||
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
|
||||
STsdbReader* pReader);
|
||||
void clearBlockScanInfo(STableBlockScanInfo* p);
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap);
|
||||
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step);
|
||||
void cleanupInfoForNextFileset(SSHashObj* pTableMap);
|
||||
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables);
|
||||
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf);
|
||||
int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pRes);
|
||||
|
||||
// brin records iterator
|
||||
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList);
|
||||
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter);
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter);
|
||||
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList);
|
||||
int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord);
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter);
|
||||
|
||||
// initialize block iterator API
|
||||
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList);
|
||||
bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
|
||||
|
||||
// load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time)
|
||||
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
||||
int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
||||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
|
@ -358,10 +357,10 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2);
|
|||
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
||||
void clearRowKey(SRowKey* pKey);
|
||||
|
||||
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp);
|
||||
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
|
||||
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
|
||||
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
||||
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp);
|
||||
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
|
||||
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
|
||||
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
||||
|
||||
typedef struct {
|
||||
SArray* pTombData;
|
||||
|
|
|
@ -388,9 +388,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
pCost->totalCheckedRows += pBlock->info.rows;
|
||||
pCost->loadBlocks += 1;
|
||||
|
||||
SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL);
|
||||
if (p == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
ASSERT(p == pBlock);
|
||||
|
@ -1351,7 +1352,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
}
|
||||
|
||||
if (hasNext) {
|
||||
/*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
|
||||
SSDataBlock* p = NULL;
|
||||
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
}
|
||||
|
@ -2929,8 +2935,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL);
|
||||
if (pBlock == NULL) {
|
||||
SSDataBlock* pBlock = NULL;
|
||||
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, &pBlock, NULL);
|
||||
if (pBlock == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
|
|
|
@ -1139,7 +1139,6 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pInfo->pInterpCols);
|
||||
pInfo->pInterpCols = NULL;
|
||||
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
|
||||
|
||||
pInfo->pPrevValues = NULL;
|
||||
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
|
|
|
@ -310,7 +310,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
|||
|
||||
taosArrayDestroy(pInfo->pList);
|
||||
pInfo->pList = NULL;
|
||||
|
||||
|
||||
if (pInfo->checkRspTmr != NULL) {
|
||||
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
|
||||
pInfo->checkRspTmr = NULL;
|
||||
|
|
|
@ -74,16 +74,16 @@ typedef struct SRaftStore {
|
|||
TdThreadMutex mutex;
|
||||
} SRaftStore;
|
||||
|
||||
typedef struct SSyncHbTimerData {
|
||||
struct SSyncHbTimerData {
|
||||
int64_t syncNodeRid;
|
||||
SSyncTimer* pTimer;
|
||||
SRaftId destId;
|
||||
uint64_t logicClock;
|
||||
int64_t execTime;
|
||||
int64_t rid;
|
||||
} SSyncHbTimerData;
|
||||
};
|
||||
|
||||
typedef struct SSyncTimer {
|
||||
struct SSyncTimer {
|
||||
void* pTimer;
|
||||
TAOS_TMR_CALLBACK timerCb;
|
||||
uint64_t logicClock;
|
||||
|
@ -92,7 +92,7 @@ typedef struct SSyncTimer {
|
|||
int64_t timeStamp;
|
||||
SRaftId destId;
|
||||
int64_t hbDataRid;
|
||||
} SSyncTimer;
|
||||
};
|
||||
|
||||
typedef struct SElectTimerParam {
|
||||
uint64_t logicClock;
|
||||
|
@ -106,7 +106,7 @@ typedef struct SPeerState {
|
|||
int64_t lastSendTime;
|
||||
} SPeerState;
|
||||
|
||||
typedef struct SSyncNode {
|
||||
struct SSyncNode {
|
||||
// init by SSyncInfo
|
||||
SyncGroupId vgId;
|
||||
SRaftCfg raftCfg;
|
||||
|
@ -116,7 +116,7 @@ typedef struct SSyncNode {
|
|||
|
||||
// sync io
|
||||
SSyncLogBuffer* pLogBuf;
|
||||
SWal* pWal;
|
||||
struct SWal* pWal;
|
||||
const SMsgCb* msgcb;
|
||||
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
||||
|
@ -234,7 +234,7 @@ typedef struct SSyncNode {
|
|||
|
||||
bool isStart;
|
||||
|
||||
} SSyncNode;
|
||||
};
|
||||
|
||||
// open/close --------------
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
|
||||
|
|
|
@ -103,7 +103,7 @@ typedef void* queue[2];
|
|||
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||
|
||||
typedef SRpcMsg STransMsg;
|
||||
typedef struct SRpcMsg STransMsg;
|
||||
typedef SRpcCtx STransCtx;
|
||||
typedef SRpcCtxVal STransCtxVal;
|
||||
typedef SRpcInfo STrans;
|
||||
|
|
|
@ -120,7 +120,8 @@ endi
|
|||
if $data01 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != 1 then
|
||||
if $data02 != 1 then
|
||||
print expect 1 , actual $data02
|
||||
return -1
|
||||
endi
|
||||
if $data03 != 1 then
|
||||
|
|
Loading…
Reference in New Issue