commit
dfbe9ace8a
|
@ -30,7 +30,7 @@ extern int32_t cDebugFlag;
|
|||
}
|
||||
#define tscWarn(...) \
|
||||
if (cDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \
|
||||
}
|
||||
#define tscTrace(...) \
|
||||
if (cDebugFlag & DEBUG_TRACE) { \
|
||||
|
|
|
@ -53,11 +53,7 @@ typedef struct STableComInfo {
|
|||
} STableComInfo;
|
||||
|
||||
typedef struct STableMeta {
|
||||
// super table if it is created according to super table, otherwise, tableInfo is used
|
||||
union {
|
||||
struct STableMeta *pSTable;
|
||||
STableComInfo tableInfo;
|
||||
};
|
||||
STableComInfo tableInfo;
|
||||
uint8_t tableType;
|
||||
int16_t sversion;
|
||||
SCMVgroupInfo vgroupInfo;
|
||||
|
|
|
@ -154,8 +154,8 @@ typedef struct SDataCol {
|
|||
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
||||
|
||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints);
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints);
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows);
|
||||
void dataColSetOffset(SDataCol *pCol, int nEle);
|
||||
|
||||
bool isNEleNull(SDataCol *pCol, int nEle);
|
||||
|
@ -195,7 +195,7 @@ typedef struct {
|
|||
int maxPoints; // max number of points
|
||||
int bufSize;
|
||||
|
||||
int numOfPoints;
|
||||
int numOfRows;
|
||||
int numOfCols; // Total number of cols
|
||||
int sversion; // TODO: set sversion
|
||||
void * buf;
|
||||
|
@ -205,7 +205,7 @@ typedef struct {
|
|||
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
|
||||
#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)]
|
||||
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
|
||||
#define dataColsKeyLast(pCols) ((pCols->numOfPoints == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfPoints - 1))
|
||||
#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
|
||||
|
||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
||||
void tdResetDataCols(SDataCols *pCols);
|
||||
|
|
|
@ -187,29 +187,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
|
|||
|
||||
}
|
||||
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) {
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
|
||||
ASSERT(pCol != NULL && value != NULL);
|
||||
|
||||
switch (pCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
// set offset
|
||||
pCol->dataOff[numOfPoints] = pCol->len;
|
||||
pCol->dataOff[numOfRows] = pCol->len;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(value);
|
||||
break;
|
||||
default:
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints);
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
||||
pCol->len += pCol->bytes;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) {
|
||||
int pointsLeft = numOfPoints - pointsToPop;
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
|
||||
int pointsLeft = numOfRows - pointsToPop;
|
||||
|
||||
ASSERT(pointsLeft > 0);
|
||||
|
||||
|
@ -221,7 +221,7 @@ void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) {
|
|||
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
|
||||
dataColSetOffset(pCol, pointsLeft);
|
||||
} else {
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints);
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
|
||||
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
|
||||
}
|
||||
|
@ -322,7 +322,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
|
||||
pRet->numOfCols = pDataCols->numOfCols;
|
||||
pRet->sversion = pDataCols->sversion;
|
||||
if (keepData) pRet->numOfPoints = pDataCols->numOfPoints;
|
||||
if (keepData) pRet->numOfRows = pDataCols->numOfRows;
|
||||
|
||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
pRet->cols[i].type = pDataCols->cols[i].type;
|
||||
|
@ -352,7 +352,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
}
|
||||
|
||||
void tdResetDataCols(SDataCols *pCols) {
|
||||
pCols->numOfPoints = 0;
|
||||
pCols->numOfRows = 0;
|
||||
for (int i = 0; i < pCols->maxCols; i++) {
|
||||
dataColReset(pCols->cols + i);
|
||||
}
|
||||
|
@ -365,14 +365,14 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
|
|||
SDataCol *pCol = pCols->cols + i;
|
||||
void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
|
||||
|
||||
dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints);
|
||||
dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
|
||||
}
|
||||
pCols->numOfPoints++;
|
||||
pCols->numOfRows++;
|
||||
}
|
||||
|
||||
// Pop pointsToPop points from the SDataCols
|
||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
|
||||
int pointsLeft = pCols->numOfPoints - pointsToPop;
|
||||
int pointsLeft = pCols->numOfRows - pointsToPop;
|
||||
if (pointsLeft <= 0) {
|
||||
tdResetDataCols(pCols);
|
||||
return;
|
||||
|
@ -380,14 +380,14 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
|
|||
|
||||
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
|
||||
SDataCol *pCol = pCols->cols + iCol;
|
||||
dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints);
|
||||
dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
|
||||
}
|
||||
pCols->numOfPoints = pointsLeft;
|
||||
pCols->numOfRows = pointsLeft;
|
||||
}
|
||||
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
|
||||
ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints);
|
||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
||||
ASSERT(target->numOfCols == source->numOfCols);
|
||||
|
||||
SDataCols *pTarget = NULL;
|
||||
|
@ -395,10 +395,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
|||
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
|
||||
for (int i = 0; i < rowsToMerge; i++) {
|
||||
for (int j = 0; j < source->numOfCols; j++) {
|
||||
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints,
|
||||
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
target->numOfPoints++;
|
||||
target->numOfRows++;
|
||||
}
|
||||
} else {
|
||||
pTarget = tdDupDataCols(target, true);
|
||||
|
@ -406,7 +406,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
|||
|
||||
int iter1 = 0;
|
||||
int iter2 = 0;
|
||||
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge);
|
||||
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
|
||||
}
|
||||
|
||||
tdFreeDataCols(pTarget);
|
||||
|
@ -421,30 +421,30 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
|
|||
// TODO: add resolve duplicate key here
|
||||
tdResetDataCols(target);
|
||||
|
||||
while (target->numOfPoints < tRows) {
|
||||
if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break;
|
||||
while (target->numOfRows < tRows) {
|
||||
if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
|
||||
|
||||
TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
||||
TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
||||
TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
||||
TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
||||
|
||||
if (key1 <= key2) {
|
||||
for (int i = 0; i < src1->numOfCols; i++) {
|
||||
ASSERT(target->cols[i].type == src1->cols[i].type);
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints,
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
|
||||
target->numOfPoints++;
|
||||
target->numOfRows++;
|
||||
(*iter1)++;
|
||||
if (key1 == key2) (*iter2)++;
|
||||
} else {
|
||||
for (int i = 0; i < src2->numOfCols; i++) {
|
||||
ASSERT(target->cols[i].type == src2->cols[i].type);
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints,
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
|
||||
target->numOfPoints++;
|
||||
target->numOfRows++;
|
||||
(*iter2)++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -273,6 +273,46 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
|
|||
#endif
|
||||
}
|
||||
|
||||
static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
|
||||
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
|
||||
const char* data = pData;
|
||||
ASSERT(numOfRow <= INT16_MAX);
|
||||
|
||||
for (int32_t i = 0; i < numOfRow; ++i) {
|
||||
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_BINARY)) {
|
||||
(*numOfNull) += 1;
|
||||
}
|
||||
|
||||
data += varDataLen(data);
|
||||
}
|
||||
|
||||
*sum = 0;
|
||||
*max = 0;
|
||||
*min = 0;
|
||||
*minIndex = 0;
|
||||
*maxIndex = 0;
|
||||
}
|
||||
|
||||
static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
|
||||
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
|
||||
const char* data = pData;
|
||||
ASSERT(numOfRow <= INT16_MAX);
|
||||
|
||||
for (int32_t i = 0; i < numOfRow; ++i) {
|
||||
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_NCHAR)) {
|
||||
(*numOfNull) += 1;
|
||||
}
|
||||
|
||||
data += varDataLen(data);
|
||||
}
|
||||
|
||||
*sum = 0;
|
||||
*max = 0;
|
||||
*min = 0;
|
||||
*minIndex = 0;
|
||||
*maxIndex = 0;
|
||||
}
|
||||
|
||||
tDataTypeDescriptor tDataTypeDesc[11] = {
|
||||
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL},
|
||||
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_i8},
|
||||
|
@ -282,9 +322,9 @@ tDataTypeDescriptor tDataTypeDesc[11] = {
|
|||
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64},
|
||||
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f},
|
||||
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d},
|
||||
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, NULL},
|
||||
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, getStatics_bin},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
|
||||
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, NULL},
|
||||
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, getStatics_nchr},
|
||||
};
|
||||
|
||||
char tTokenTypeSwitcher[13] = {
|
||||
|
|
|
@ -200,7 +200,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
|
|||
return false;
|
||||
} else { // prev has been located
|
||||
if (pQuery->fileId >= 0) {
|
||||
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1;
|
||||
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfRows - 1;
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
|
||||
|
||||
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
|
||||
|
@ -210,11 +210,11 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
|
|||
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
|
||||
pBlock = &pRuntimeEnv->cacheBlock;
|
||||
|
||||
pQuery->pos = pBlock->numOfPoints - 1;
|
||||
pQuery->pos = pBlock->numOfRows - 1;
|
||||
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
|
||||
|
||||
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
|
||||
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos);
|
||||
pQuery->fileId, pQuery->slot, pBlock->numOfRows - 1, pQuery->pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -603,9 +603,9 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
|
|||
return &pWindowResInfo->pResult[slot].status;
|
||||
}
|
||||
|
||||
static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
|
||||
static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
|
||||
int16_t order, int64_t *pData) {
|
||||
int32_t endPos = searchFn((char *)pData, numOfPoints, ekey, order);
|
||||
int32_t endPos = searchFn((char *)pData, numOfRows, ekey, order);
|
||||
int32_t forwardStep = 0;
|
||||
|
||||
if (endPos >= 0) {
|
||||
|
@ -2329,7 +2329,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
|
|||
|
||||
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
||||
int32_t midPos = -1;
|
||||
int32_t numOfPoints;
|
||||
int32_t numOfRows;
|
||||
|
||||
if (num <= 0) {
|
||||
return -1;
|
||||
|
@ -2348,8 +2348,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
|||
if (key == keyList[firstPos]) return firstPos;
|
||||
if (key < keyList[firstPos]) return firstPos - 1;
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
@ -2374,8 +2374,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
|||
return lastPos;
|
||||
}
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
|
|
@ -74,7 +74,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh);
|
|||
typedef struct {
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
int32_t numOfPoints;
|
||||
int32_t numOfRows;
|
||||
void * pData;
|
||||
} SMemTable;
|
||||
|
||||
|
@ -173,7 +173,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
int64_t numOfPoints;
|
||||
int64_t numOfRows;
|
||||
SList * list;
|
||||
} SCacheMem;
|
||||
|
||||
|
@ -294,7 +294,7 @@ typedef struct {
|
|||
int64_t last : 1; // If the block in data file or last file
|
||||
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
|
||||
int32_t algorithm : 8; // Compression algorithm
|
||||
int32_t numOfPoints : 24; // Number of total points
|
||||
int32_t numOfRows : 24; // Number of total points
|
||||
int32_t sversion; // Schema version
|
||||
int32_t len; // Data block length or nothing
|
||||
int16_t numOfSubBlocks; // Number of sub-blocks;
|
||||
|
|
|
@ -82,7 +82,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
|
|||
memset(ptr, 0, bytes);
|
||||
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
|
||||
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
|
||||
pCache->mem->numOfPoints++;
|
||||
pCache->mem->numOfRows++;
|
||||
|
||||
return ptr;
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
|||
if (pCache->mem == NULL) return -1;
|
||||
pCache->mem->keyFirst = INT64_MAX;
|
||||
pCache->mem->keyLast = 0;
|
||||
pCache->mem->numOfPoints = 0;
|
||||
pCache->mem->numOfRows = 0;
|
||||
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
|
||||
}
|
||||
|
||||
|
|
|
@ -233,10 +233,10 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
|||
// SCompBlock *pBlock = pStartBlock;
|
||||
// for (int i = 0; i < numOfBlocks; i++) {
|
||||
// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
|
||||
// pCols->numOfPoints += (pCompData->cols[0].len / 8);
|
||||
// pCols->numOfRows += (pCompData->cols[0].len / 8);
|
||||
// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
|
||||
// SCompCol *pCompCol = &(pCompData->cols[iCol]);
|
||||
// // pCols->numOfPoints += pBlock->numOfPoints;
|
||||
// // pCols->numOfRows += pBlock->numOfRows;
|
||||
// int k = 0;
|
||||
// for (; k < pCols->numOfCols; k++) {
|
||||
// if (pCompCol->colId == pCols->cols[k].colId) break;
|
||||
|
|
|
@ -830,7 +830,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize);
|
||||
|
||||
TSKEY key = dataRowKey(row);
|
||||
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
|
||||
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfRows);
|
||||
|
||||
// Copy row into the memory
|
||||
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
|
||||
|
@ -854,7 +854,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
|
||||
if (key > pTable->lastKey) pTable->lastKey = key;
|
||||
|
||||
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
|
||||
pTable->mem->numOfRows = tSkipListGetSize(pTable->mem->pData);
|
||||
|
||||
tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId,
|
||||
pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row));
|
||||
|
@ -1063,7 +1063,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
while (true) {
|
||||
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
assert(rowsRead >= 0);
|
||||
if (pDataCols->numOfPoints == 0) break;
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
nLoop++;
|
||||
|
||||
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
|
||||
|
@ -1072,13 +1072,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
||||
ASSERT(rowsWritten != 0);
|
||||
if (rowsWritten < 0) goto _err;
|
||||
ASSERT(rowsWritten <= pDataCols->numOfPoints);
|
||||
ASSERT(rowsWritten <= pDataCols->numOfRows);
|
||||
|
||||
tdPopDataColsPoints(pDataCols, rowsWritten);
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints;
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
|
||||
}
|
||||
|
||||
ASSERT(pDataCols->numOfPoints == 0);
|
||||
ASSERT(pDataCols->numOfRows == 0);
|
||||
|
||||
// Move the last block to the new .l file if neccessary
|
||||
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
|
||||
|
|
|
@ -307,7 +307,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
|||
*/
|
||||
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
|
||||
ASSERT(pDataCols->numOfPoints > 0);
|
||||
ASSERT(pDataCols->numOfRows > 0);
|
||||
|
||||
SCompBlock compBlock;
|
||||
int rowsToWrite = 0;
|
||||
|
@ -322,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
|||
|
||||
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
|
||||
ASSERT(pHelper->hasOldLastBlock == false);
|
||||
rowsToWrite = pDataCols->numOfPoints;
|
||||
rowsToWrite = pDataCols->numOfRows;
|
||||
SFile *pWFile = NULL;
|
||||
bool isLast = false;
|
||||
|
||||
|
@ -380,10 +380,10 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
|
||||
if (pCompBlock->numOfSubBlocks > 1) {
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 &&
|
||||
pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock);
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
|
||||
pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
||||
pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0)
|
||||
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
|
||||
return -1;
|
||||
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
||||
|
@ -625,13 +625,13 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx,
|
|||
for (int i = 1; i < numOfSubBlocks; i++) {
|
||||
pStartBlock++;
|
||||
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
|
||||
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints);
|
||||
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints,
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
|
||||
int maxPoints, char *buffer, int bufferSize) {
|
||||
// Verify by checksum
|
||||
if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;
|
||||
|
@ -640,16 +640,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
|||
if (comp) {
|
||||
// // Need to decompress
|
||||
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
|
||||
content, len - sizeof(TSCKSUM), numOfPoints, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
|
||||
content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
|
||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
dataColSetOffset(pDataCol, numOfPoints);
|
||||
dataColSetOffset(pDataCol, numOfRows);
|
||||
}
|
||||
} else {
|
||||
// No need to decompress, just memcpy it
|
||||
pDataCol->len = len - sizeof(TSCKSUM);
|
||||
memcpy(pDataCol->pData, content, pDataCol->len);
|
||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
dataColSetOffset(pDataCol, numOfPoints);
|
||||
dataColSetOffset(pDataCol, numOfRows);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -673,7 +673,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
|
||||
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err;
|
||||
|
||||
pDataCols->numOfPoints = pCompBlock->numOfPoints;
|
||||
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||
|
||||
// Recover the data
|
||||
int ccol = 0;
|
||||
|
@ -682,7 +682,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||
if (ccol >= pCompData->numOfCols) {
|
||||
// Set current column as NULL and forward
|
||||
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
|
||||
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
|
||||
dcol++;
|
||||
continue;
|
||||
}
|
||||
|
@ -691,15 +691,15 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
|
||||
if (pCompCol->colId == pDataCol->colId) {
|
||||
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
||||
int zsize = pDataCol->bytes * pCompBlock->numOfPoints + COMP_OVERFLOW_BYTES;
|
||||
int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||
if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfPoints);
|
||||
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
|
||||
}
|
||||
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
|
||||
if (pHelper->compBuffer == NULL) goto _err;
|
||||
}
|
||||
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
||||
pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints,
|
||||
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
|
||||
goto _err;
|
||||
dcol++;
|
||||
|
@ -708,7 +708,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
ccol++;
|
||||
} else {
|
||||
// Set current column as NULL and forward
|
||||
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
|
||||
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
|
||||
dcol++;
|
||||
}
|
||||
}
|
||||
|
@ -732,7 +732,7 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar
|
|||
tdResetDataCols(pHelper->pDataCols[1]);
|
||||
pCompBlock++;
|
||||
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err;
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
||||
}
|
||||
|
||||
// if (target) TODO
|
||||
|
@ -753,7 +753,7 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
|||
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
|
||||
bool isLast, bool isSuperBlock) {
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows &&
|
||||
rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
|
||||
|
||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||
|
@ -840,7 +840,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
pCompBlock->last = isLast;
|
||||
pCompBlock->offset = offset;
|
||||
pCompBlock->algorithm = pHelper->config.compress;
|
||||
pCompBlock->numOfPoints = rowsToWrite;
|
||||
pCompBlock->numOfRows = rowsToWrite;
|
||||
pCompBlock->sversion = pHelper->tableInfo.sversion;
|
||||
pCompBlock->len = (int32_t)lsize;
|
||||
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
|
||||
|
@ -877,7 +877,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
int rowsWritten = 0;
|
||||
SCompBlock compBlock = {0};
|
||||
|
||||
ASSERT(pDataCols->numOfPoints > 0);
|
||||
ASSERT(pDataCols->numOfRows > 0);
|
||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||
|
||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||
|
@ -889,32 +889,32 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
||||
|
||||
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
|
||||
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
||||
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
||||
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
|
||||
goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints);
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
// Write
|
||||
SFile *pWFile = NULL;
|
||||
bool isLast = false;
|
||||
if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) {
|
||||
if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) {
|
||||
pWFile = &(pHelper->files.dataF);
|
||||
} else {
|
||||
isLast = true;
|
||||
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0],
|
||||
pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0)
|
||||
pHelper->pDataCols[0]->numOfRows, &compBlock, isLast, true) < 0)
|
||||
goto _err;
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
|
||||
}
|
||||
|
@ -931,7 +931,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// rows1: number of rows must merge in this block
|
||||
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
// rows2: max nuber of rows the block can have more
|
||||
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints;
|
||||
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
|
||||
// rows3: number of rows between this block and the next block
|
||||
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
|
||||
|
||||
|
@ -939,7 +939,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
|
||||
if ((rows2 >= rows1) &&
|
||||
(( blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
|
||||
rowsWritten = rows1;
|
||||
bool isLast = false;
|
||||
SFile *pFile = NULL;
|
||||
|
@ -965,11 +965,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
int round = 0;
|
||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
||||
while (true) {
|
||||
if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break;
|
||||
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
|
||||
ASSERT(pHelper->pDataCols[1]->numOfPoints > 0);
|
||||
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
||||
pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
|
||||
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||
goto _err;
|
||||
if (round == 0) {
|
||||
tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
|
||||
|
@ -980,17 +980,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
blkIdx++;
|
||||
// TODO: the blkIdx here is not correct
|
||||
|
||||
// if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) {
|
||||
// if (pHelper->pDataCols[1]->numOfPoints > 0) {
|
||||
// if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) {
|
||||
// if (pHelper->pDataCols[1]->numOfRows > 0) {
|
||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
|
||||
// pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
|
||||
// pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||
// goto _err;
|
||||
// // TODO: the blkIdx here is not correct
|
||||
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints);
|
||||
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows);
|
||||
// }
|
||||
// }
|
||||
|
||||
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints
|
||||
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows
|
||||
// ? INT64_MAX
|
||||
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
|
||||
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
|
||||
|
@ -998,11 +998,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// if (key1 < key2) {
|
||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
||||
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
|
||||
// TYPE_BYTES[pDataCol->type]);
|
||||
// }
|
||||
// pHelper->pDataCols[1]->numOfPoints++;
|
||||
// pHelper->pDataCols[1]->numOfRows++;
|
||||
// iter1++;
|
||||
// } else if (key1 == key2) {
|
||||
// // TODO: think about duplicate key cases
|
||||
|
@ -1010,17 +1010,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// } else {
|
||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
||||
// ((char *)pDataCols->cols[i].pData +
|
||||
// TYPE_BYTES[pDataCol->type] * iter2),
|
||||
// TYPE_BYTES[pDataCol->type]);
|
||||
// }
|
||||
// pHelper->pDataCols[1]->numOfPoints++;
|
||||
// pHelper->pDataCols[1]->numOfRows++;
|
||||
// iter2++;
|
||||
// }
|
||||
|
||||
// if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
|
||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err;
|
||||
// if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
|
||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err;
|
||||
// // TODO: blkIdx here is not correct, fix it
|
||||
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
|
||||
|
||||
|
@ -1133,7 +1133,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
|
|||
pSCompBlock->numOfSubBlocks++;
|
||||
ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
|
||||
pSCompBlock->len += sizeof(SCompBlock);
|
||||
pSCompBlock->numOfPoints += rowsAdded;
|
||||
pSCompBlock->numOfRows += rowsAdded;
|
||||
pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
|
||||
pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
|
||||
pIdx->len += sizeof(SCompBlock);
|
||||
|
@ -1164,7 +1164,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
|
|||
((SCompBlock *)ptr)[1] = *pCompBlock;
|
||||
|
||||
pSCompBlock->numOfSubBlocks = 2;
|
||||
pSCompBlock->numOfPoints += rowsAdded;
|
||||
pSCompBlock->numOfRows += rowsAdded;
|
||||
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
|
||||
pSCompBlock->len = sizeof(SCompBlock) * 2;
|
||||
pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
|
||||
|
@ -1219,7 +1219,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
|||
|
||||
// Get the number of rows in range [minKey, maxKey]
|
||||
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
|
||||
if (pDataCols->numOfPoints == 0) return 0;
|
||||
if (pDataCols->numOfRows == 0) return 0;
|
||||
|
||||
ASSERT(minKey <= maxKey);
|
||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||
|
@ -1228,11 +1228,11 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
|
|||
|
||||
if (minKey > keyLast || maxKey < keyFirst) return 0;
|
||||
|
||||
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
|
||||
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
||||
compTSKEY, TD_GE);
|
||||
ASSERT(ptr1 != NULL);
|
||||
|
||||
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
|
||||
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
||||
compTSKEY, TD_LE);
|
||||
ASSERT(ptr2 != NULL);
|
||||
|
||||
|
|
|
@ -436,7 +436,7 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo
|
|||
SDataBlockInfo info = {
|
||||
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast},
|
||||
.numOfCols = pBlock->numOfCols,
|
||||
.rows = pBlock->numOfPoints,
|
||||
.rows = pBlock->numOfRows,
|
||||
.tid = pCheckInfo->tableId.tid,
|
||||
.uid = pCheckInfo->tableId.uid,
|
||||
};
|
||||
|
@ -608,11 +608,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
}
|
||||
|
||||
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
|
||||
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfPoints == pBlock->numOfPoints);
|
||||
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
|
||||
|
||||
if (pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
} else {
|
||||
cur->pos = 0;
|
||||
}
|
||||
|
@ -630,9 +630,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
SDataCols* pDataCols = pCheckInfo->pDataCols;
|
||||
if (pCheckInfo->lastKey < pBlock->keyLast) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
} else {
|
||||
cur->pos = pBlock->numOfPoints - 1;
|
||||
cur->pos = pBlock->numOfRows - 1;
|
||||
}
|
||||
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
|
||||
|
@ -647,7 +647,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
|
||||
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||
int firstPos, lastPos, midPos = -1;
|
||||
int numOfPoints;
|
||||
int numOfRows;
|
||||
TSKEY* keyList;
|
||||
|
||||
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||
|
@ -665,8 +665,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
|||
if (key == keyList[firstPos]) return firstPos;
|
||||
if (key < keyList[firstPos]) return firstPos - 1;
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
@ -691,8 +691,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
|||
return lastPos;
|
||||
}
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
@ -810,7 +810,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
cur->mixBlock = (cur->pos != blockInfo.rows - 1);
|
||||
} else {
|
||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
|
||||
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
|
||||
|
@ -904,7 +904,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
}
|
||||
|
||||
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
|
||||
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
|
||||
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
}
|
||||
|
@ -1002,7 +1002,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||
int firstPos, lastPos, midPos = -1;
|
||||
int numOfPoints;
|
||||
int numOfRows;
|
||||
TSKEY* keyList;
|
||||
|
||||
if (num <= 0) return -1;
|
||||
|
@ -1018,8 +1018,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
|||
if (key == keyList[firstPos]) return firstPos;
|
||||
if (key < keyList[firstPos]) return firstPos - 1;
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
@ -1044,8 +1044,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
|||
return lastPos;
|
||||
}
|
||||
|
||||
numOfPoints = lastPos - firstPos + 1;
|
||||
midPos = (numOfPoints >> 1) + firstPos;
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < keyList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
|
@ -1066,7 +1066,6 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
|
||||
// tfree(pBlockInfo->statis);
|
||||
tfree(pBlockInfo);
|
||||
}
|
||||
|
||||
|
@ -1539,9 +1538,19 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
|
||||
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
|
||||
|
||||
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, QH_GET_NUM_OF_COLS(pHandle));
|
||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
|
||||
memset(pHandle->statis, 0, sizeof(SDataStatis) * numOfCols);
|
||||
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
|
||||
|
||||
*pBlockStatis = pHandle->statis;
|
||||
|
||||
//update the number of NULL data rows
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
if (pHandle->statis[i].numOfNull == -1) {
|
||||
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1575,7 +1584,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
|
|||
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
|
||||
|
||||
// todo refactor
|
||||
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfPoints - 1);
|
||||
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
||||
|
||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
|
||||
|
|
Loading…
Reference in New Issue