Merge pull request #20122 from taosdata/enh/TD-22715
fix: fix distributed table error
This commit is contained in:
commit
6120c73aaa
|
@ -205,7 +205,7 @@ typedef struct SDataBlockInfo {
|
|||
STimeWindow calWin; // used for stream, do not serialize
|
||||
TSKEY watermark; // used for stream
|
||||
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
@ -291,7 +291,6 @@ typedef struct STableBlockDistInfo {
|
|||
uint16_t numOfFiles;
|
||||
uint32_t numOfTables;
|
||||
uint32_t numOfBlocks;
|
||||
uint32_t numOfVgroups;
|
||||
uint64_t totalSize;
|
||||
uint64_t totalRows;
|
||||
int32_t maxRows;
|
||||
|
@ -301,6 +300,7 @@ typedef struct STableBlockDistInfo {
|
|||
int32_t firstSeekTimeUs;
|
||||
uint32_t numOfInmemRows;
|
||||
uint32_t numOfSmallBlocks;
|
||||
uint32_t numOfVgroups;
|
||||
int32_t blockRowsHisto[20];
|
||||
} STableBlockDistInfo;
|
||||
|
||||
|
@ -341,7 +341,7 @@ typedef struct SExprInfo {
|
|||
|
||||
typedef struct {
|
||||
const char* key;
|
||||
size_t keyLen;
|
||||
size_t keyLen;
|
||||
uint8_t type;
|
||||
union {
|
||||
const char* value;
|
||||
|
@ -385,9 +385,9 @@ typedef struct STUidTagInfo {
|
|||
#define TABLE_NAME_COLUMN_INDEX 6
|
||||
|
||||
// stream create table block column
|
||||
#define UD_TABLE_NAME_COLUMN_INDEX 0
|
||||
#define UD_GROUPID_COLUMN_INDEX 1
|
||||
#define UD_TAG_COLUMN_INDEX 2
|
||||
#define UD_TABLE_NAME_COLUMN_INDEX 0
|
||||
#define UD_GROUPID_COLUMN_INDEX 1
|
||||
#define UD_TAG_COLUMN_INDEX 2
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -488,7 +488,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool *hasNext) {
|
||||
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) {
|
||||
bool asc = ASCENDING_TRAVERSE(pIter->order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
pIter->index += step;
|
||||
|
@ -2802,13 +2802,13 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
|||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
||||
|
||||
while (1) {
|
||||
bool hasNext = false;
|
||||
bool hasNext = false;
|
||||
int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
|
||||
if (code) {
|
||||
taosArrayDestroy(pIndexList);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
if (!hasNext) { // no data files on disk
|
||||
break;
|
||||
}
|
||||
|
@ -4688,6 +4688,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
|||
pTableBlockInfo->numOfVgroups = 1;
|
||||
|
||||
// find the start data block in file
|
||||
|
||||
tsdbAcquireReader(pReader);
|
||||
if (pReader->suspended) {
|
||||
tsdbReaderResume(pReader);
|
||||
}
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
|
||||
|
@ -4749,7 +4754,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
|||
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
||||
// pReader->pFileGroup->fid, pReader->idStr);
|
||||
}
|
||||
|
||||
tsdbReleaseReader(pReader);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
||||
int32_t rowIndex);
|
||||
int32_t rowIndex);
|
||||
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
((int64_t*)pCol->pData)[currentRow] = pRes->v;
|
||||
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
||||
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
|
@ -920,9 +920,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
|
|||
}
|
||||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
*pDestPos = *pSourcePos;
|
||||
}
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { *pDestPos = *pSourcePos; }
|
||||
|
||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
|
@ -1686,7 +1684,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
double v = 0;
|
||||
double v = 0;
|
||||
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||
|
@ -1714,7 +1712,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pCol, pBlock->info.rows, buf, false);
|
||||
|
||||
|
@ -1739,7 +1737,6 @@ _fin_error:
|
|||
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
return code;
|
||||
|
||||
}
|
||||
|
||||
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
@ -2100,7 +2097,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||
SFirstLastRes* pInfo) {
|
||||
SFirstLastRes* pInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
|
@ -2153,7 +2150,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
// save selectivity value for column consisted of all null values
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2269,7 +2267,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
// save selectivity value for column consisted of all null values
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2363,7 +2362,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||
char* data = colDataGetData(pInputCol, chosen);
|
||||
char* data = colDataGetData(pInputCol, chosen);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2374,7 +2373,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2391,7 +2390,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
numOfElems++;
|
||||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2438,7 +2437,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
|
|||
}
|
||||
|
||||
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||
int32_t rowIndex) {
|
||||
int32_t rowIndex) {
|
||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2465,7 +2464,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2697,7 +2696,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
|||
}
|
||||
|
||||
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
||||
int32_t order, int64_t ts) {
|
||||
int32_t order, int64_t ts) {
|
||||
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
pDiffInfo->prevTs = ts;
|
||||
switch (type) {
|
||||
|
@ -2905,8 +2904,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
|||
return pRes;
|
||||
}
|
||||
|
||||
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||
|
||||
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
||||
|
||||
|
@ -2927,7 +2926,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2961,7 +2960,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3013,7 +3012,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
|
|||
}
|
||||
|
||||
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
|
||||
SVariant val = {0};
|
||||
|
@ -3204,7 +3203,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
|
|||
if (pPage == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
char* p = pPage->data + pPos->offset;
|
||||
char* p = pPage->data + pPos->offset;
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
return p;
|
||||
} else {
|
||||
|
@ -4665,7 +4664,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -4685,7 +4684,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
||||
|
@ -5019,7 +5018,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doModeAdd(pInfo, i, pCtx, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -5425,7 +5424,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->numOfBlocks += p1.numOfBlocks;
|
||||
pDistInfo->numOfTables += p1.numOfTables;
|
||||
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
|
||||
pDistInfo->numOfVgroups += p1.numOfVgroups;
|
||||
pDistInfo->totalSize += p1.totalSize;
|
||||
pDistInfo->totalRows += p1.totalRows;
|
||||
pDistInfo->numOfFiles += p1.numOfFiles;
|
||||
|
@ -5442,6 +5440,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->maxRows = p1.maxRows;
|
||||
}
|
||||
|
||||
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
|
||||
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
|
||||
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
||||
}
|
||||
|
@ -5460,7 +5459,6 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
|||
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1;
|
||||
|
@ -5470,6 +5468,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
|||
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||
|
@ -5492,7 +5491,6 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1;
|
||||
|
@ -5502,6 +5500,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue