Merge pull request #17495 from taosdata/fix/liao_cov

refactor: remove redundant codes and do some internal refactor.
This commit is contained in:
Haojun Liao 2022-10-20 13:16:02 +08:00 committed by GitHub
commit c41713cf21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 256 additions and 289 deletions

View File

@ -340,12 +340,8 @@ typedef struct tDataTypeDescriptor {
} tDataTypeDescriptor; } tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX]; extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX];
bool isValidDataType(int32_t type); bool isValidDataType(int32_t type);
void setVardataNull(void *val, int32_t type);
//void setNull(void *val, int32_t type, int32_t bytes);
//void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems);
void assignVal(char *val, const char *src, int32_t len, int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type);
void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
void *getDataMin(int32_t type); void *getDataMin(int32_t type);

View File

@ -237,7 +237,6 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_BYTES_PER_ROW 49151 #define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAGS 128 #define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS) #define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)
#define TSDB_MAX_JSON_TAG_LEN 16384 #define TSDB_MAX_JSON_TAG_LEN 16384
@ -274,8 +273,6 @@ typedef enum ELogicConditionType {
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 16 #define TSDB_MIN_VNODES 16
#define TSDB_MAX_VNODES 512 #define TSDB_MAX_VNODES 512
@ -285,10 +282,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_REPLICA 5 #define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MIN_VNODES_PER_DB 1
@ -398,9 +392,6 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_EXPLAIN_RATIO 1 #define TSDB_MAX_EXPLAIN_RATIO 1
#define TSDB_DEFAULT_EXPLAIN_RATIO 0.001 #define TSDB_DEFAULT_EXPLAIN_RATIO 0.001
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_DEFAULT_EXPLAIN_VERBOSE false #define TSDB_DEFAULT_EXPLAIN_VERBOSE false
#define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024) #define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024)
@ -419,7 +410,6 @@ typedef enum ELogicConditionType {
#endif #endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default #define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default

View File

@ -244,8 +244,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
* 2013-04-12T15:52:01.123+0800 * 2013-04-12T15:52:01.123+0800
*/ */
int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) { int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) {
int64_t factor = int64_t factor = TSDB_TICK_PER_SECOND(timePrec);
(timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
int64_t tzOffset = 0; int64_t tzOffset = 0;
struct tm tm = {0}; struct tm tm = {0};
@ -339,8 +338,8 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
return true; return true;
} }
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) {
*time = 0; *utime = 0;
struct tm tm = {0}; struct tm tm = {0};
char* str; char* str;
@ -378,15 +377,12 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr
} }
} }
int64_t factor = *utime = TSDB_TICK_PER_SECOND(timePrec) * seconds + fraction;
(timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
*time = factor * seconds + fraction;
return 0; return 0;
} }
int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) {
*time = 0; *utime = 0;
struct tm tm = {0}; struct tm tm = {0};
tm.tm_isdst = -1; tm.tm_isdst = -1;
@ -411,7 +407,6 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim
int64_t seconds = taosMktime(&tm); int64_t seconds = taosMktime(&tm);
int64_t fraction = 0; int64_t fraction = 0;
if (*str == '.') { if (*str == '.') {
/* parse the second fraction part */ /* parse the second fraction part */
if ((fraction = parseFraction(str + 1, &str, timePrec)) < 0) { if ((fraction = parseFraction(str + 1, &str, timePrec)) < 0) {
@ -419,9 +414,7 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim
} }
} }
int64_t factor = *utime = TSDB_TICK_PER_SECOND(timePrec) * seconds + fraction;
(timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
*time = factor * seconds + fraction;
return 0; return 0;
} }
@ -437,58 +430,61 @@ char getPrecisionUnit(int32_t precision) {
} }
} }
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || ASSERT(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO); fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO || ASSERT(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO); toPrecision == TSDB_TIME_PRECISION_NANO);
double tempResult = (double)time;
double tempResult = (double)utime;
switch (fromPrecision) { switch (fromPrecision) {
case TSDB_TIME_PRECISION_MILLI: { case TSDB_TIME_PRECISION_MILLI: {
switch (toPrecision) { switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
return time; return utime;
case TSDB_TIME_PRECISION_MICRO: case TSDB_TIME_PRECISION_MICRO:
tempResult *= 1000; tempResult *= 1000;
time *= 1000; utime *= 1000;
goto end_; goto end_;
case TSDB_TIME_PRECISION_NANO: case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000000; tempResult *= 1000000;
time *= 1000000; utime *= 1000000;
goto end_; goto end_;
} }
} // end from milli } // end from milli
case TSDB_TIME_PRECISION_MICRO: { case TSDB_TIME_PRECISION_MICRO: {
switch (toPrecision) { switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
return time / 1000; return utime / 1000;
case TSDB_TIME_PRECISION_MICRO: case TSDB_TIME_PRECISION_MICRO:
return time; return utime;
case TSDB_TIME_PRECISION_NANO: case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000; tempResult *= 1000;
time *= 1000; utime *= 1000;
goto end_; goto end_;
} }
} // end from micro } // end from micro
case TSDB_TIME_PRECISION_NANO: { case TSDB_TIME_PRECISION_NANO: {
switch (toPrecision) { switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
return time / 1000000; return utime / 1000000;
case TSDB_TIME_PRECISION_MICRO: case TSDB_TIME_PRECISION_MICRO:
return time / 1000; return utime / 1000;
case TSDB_TIME_PRECISION_NANO: case TSDB_TIME_PRECISION_NANO:
return time; return utime;
} }
} // end from nano } // end from nano
default: { default: {
assert(0); assert(0);
return time; // only to pass windows compilation return utime; // only to pass windows compilation
} }
} // end switch fromPrecision } // end switch fromPrecision
end_: end_:
if (tempResult >= (double)INT64_MAX) return INT64_MAX; if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
return time; return utime;
} }
// !!!!notice:there are precision problems, double lose precison if time is too large, for example: // !!!!notice:there are precision problems, double lose precison if time is too large, for example:

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "ttypes.h" #include "ttypes.h"
#include "tcompression.h" #include "tcompression.h"
#include "trow.h"
const int32_t TYPE_BYTES[16] = { const int32_t TYPE_BYTES[16] = {
-1, // TSDB_DATA_TYPE_NULL -1, // TSDB_DATA_TYPE_NULL
@ -86,18 +85,6 @@ FORCE_INLINE void *getDataMax(int32_t type) {
bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; } bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; }
void setVardataNull(void *val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(val, sizeof(int8_t));
*(uint8_t *)varDataVal(val) = TSDB_DATA_BINARY_NULL;
} else if (type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t *)varDataVal(val) = TSDB_DATA_NCHAR_NULL;
} else {
assert(0);
}
}
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
void assignVal(char *val, const char *src, int32_t len, int32_t type) { void assignVal(char *val, const char *src, int32_t len, int32_t type) {

View File

@ -194,7 +194,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;

View File

@ -120,44 +120,46 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
return &pInfo->blockData[1]; return &pInfo->blockData[1];
} }
pInfo->currentLoadBlockIndex ^= 1; if (pIter->pSttBlk == NULL) {
if (pIter->pSttBlk != NULL) { // current block not loaded yet return NULL;
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
} }
// current block not loaded yet
pInfo->currentLoadBlockIndex ^= 1;
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
return &pInfo->blockData[pInfo->currentLoadBlockIndex]; return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit: _exit:
@ -259,7 +261,8 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr) { const char *idStr) {
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) { if (*pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -336,7 +339,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
} }
return code;
_exit: _exit:
taosMemoryFree(*pIter);
return code; return code;
} }
@ -473,7 +479,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
int32_t iBlockL = pIter->iSttBlk; int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = loadLastBlock(pIter, idStr); SBlockData *pBlockData = loadLastBlock(pIter, idStr);
if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) { if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
goto _exit; goto _exit;
} }

View File

@ -1047,11 +1047,16 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
} }
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
} }
@ -1135,7 +1140,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->index = asc ? 0 : (numOfBlocks - 1); pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1175,12 +1180,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
taosMemoryFree(pTree); taosMemoryFree(pTree);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1); pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool blockIteratorNext(SDataBlockIter* pBlockIter) { static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
bool asc = ASCENDING_TRAVERSE(pBlockIter->order); bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
@ -1189,7 +1194,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
} }
pBlockIter->index += step; pBlockIter->index += step;
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter, idStr);
return true; return true;
} }
@ -1260,7 +1265,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx); ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
} }
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter, "");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2190,6 +2195,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} }
static int32_t buildComposedDataBlock(STsdbReader* pReader) { static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
@ -2200,6 +2207,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s",
pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _end;
}
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
@ -2276,7 +2290,7 @@ _end:
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
return TSDB_CODE_SUCCESS; return code;
} }
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
@ -2732,7 +2746,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// current block are exhausted, try the next file block // current block are exhausted, try the next file block
if (pDumpInfo->allDumped) { if (pDumpInfo->allDumped) {
// try next data block in current file // try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { } else {
@ -3658,10 +3672,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
} }
if (pReader->innerReader[0] != 0) {
tsdbUntakeReadSnap(pReader->innerReader[0]->pTsdb, pReader->innerReader[0]->pReadSnap, pReader->idStr);
}
tsdbDebug( tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64
@ -3849,8 +3859,14 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
return NULL;
}
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -3979,7 +3995,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
pTableBlockInfo->blockRowsHisto[bucketIndex]++; pTableBlockInfo->blockRowsHisto[bucketIndex]++;
hasNext = blockIteratorNext(&pStatus->blockIter); hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {

View File

@ -954,9 +954,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo); const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
@ -980,9 +978,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SExecTaskInfo* pTaskInfo, int32_t numOfChild); SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);

View File

@ -162,7 +162,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pTableList->map != NULL) { if (pTableList->map != NULL) {
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
pInfo->pRes->info.groupId = *groupId; if (groupId != NULL) {
pInfo->pRes->info.groupId = *groupId;
}
} else { } else {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);

View File

@ -234,8 +234,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
while (!taosQueueEmpty(pDeleter->pDataBlocks)) { while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataDeleterBuf* pBuf = NULL; SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf); if (pBuf != NULL) {
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
} }
taosCloseQueue(pDeleter->pDataBlocks); taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex); taosThreadMutexDestroy(&pDeleter->mutex);

View File

@ -358,7 +358,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000}; SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000};
code = dsDataSinkMgtInit(&cfg); code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code: %s", tstrerror(code)); qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error; goto _error;
} }
@ -366,7 +366,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
void* pSinkParam = NULL; void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, code: %s", tstrerror(code)); qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
goto _error; goto _error;
} }

View File

@ -3048,32 +3048,40 @@ void cleanupExprSupp(SExprSupp* pSupp) {
taosMemoryFree(pSupp->rowEntryInfoOffset); taosMemoryFree(pSupp->rowEntryInfoOffset);
} }
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initBasicInfo(&pInfo->binfo, pResultBlock);
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->binfo.mergeResultBlock = mergeResult; pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX; pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pCondition; pInfo->pCondition = pAggNode->node.pConditions;
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true; pOperator->blocking = true;
@ -3332,8 +3340,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return pTaskInfo; return pTaskInfo;
} }
static SArray* extractColumnInfo(SNodeList* pNodeList);
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
@ -3710,22 +3716,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo); pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
if (pAggNode->pGroupKeys != NULL) { if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else { } else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -3815,39 +3809,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr; return pOptr;
} }
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColumn c = extractColumnFromColumnNode(pColNode);
taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
taosArrayPush(pList, &c);
}
}
return pList;
}
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
@ -4071,6 +4032,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
sql = NULL; sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);

View File

@ -30,6 +30,7 @@ static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDa
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static void freeGroupKey(void* param) { static void freeGroupKey(void* param) {
SGroupKeys* pKey = (SGroupKeys*)param; SGroupKeys* pKey = (SGroupKeys*)param;
@ -61,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i); SColumn* pCol = (SColumn*) taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag (*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0}; SGroupKeys key = {0};
@ -396,41 +397,48 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return buildGroupResultDataBlock(pOperator); return buildGroupResultDataBlock(pOperator);
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->pGroupCols = pGroupColList; SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
pInfo->pCondition = pCondition; initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
pInfo->pCondition = pAggNode->node.pConditions;
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
@ -451,8 +459,6 @@ _error:
} }
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
@ -760,7 +766,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) { if (pPartNode->pExprs != NULL) {
@ -781,14 +786,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultPgsz = 0; uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK; terrno = TSDB_CODE_NO_AVAIL_DISK;
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
qError("Create partition operator info failed since %s", terrstr(terrno)); qError("Create partition operator info failed since %s", terrstr(terrno));
blockDataDestroy(pResBlock);
goto _error; goto _error;
} }
@ -797,8 +801,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf)); pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity); pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -808,7 +812,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResBlock;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
@ -1102,3 +1105,37 @@ _error:
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColumn c = extractColumnFromColumnNode(pColNode);
taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
taosArrayPush(pList, &c);
}
}
return pList;
}

View File

@ -59,15 +59,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) { if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
@ -84,8 +85,18 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCondAfterMerge == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
pLogicCond->pParameterList = nodesMakeList(); pLogicCond->pParameterList = nodesMakeList();
if (pLogicCond->pParameterList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND; pLogicCond->condType = LOGIC_COND_TYPE_AND;
@ -106,7 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -114,9 +125,12 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator; return pOperator;
_error: _error:
taosMemoryFree(pInfo); if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = code;
return NULL; return NULL;
} }

View File

@ -494,7 +494,13 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData}; SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
if (fpSet.process != NULL) {
fpSet.process(&srcParam, 1, &param);
} else {
qError("failed to get the corresponding callback function, functionId:%d", functionId);
}
colDataDestroy(&infoData); colDataDestroy(&infoData);
} }

View File

@ -719,12 +719,16 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
pInfo->binfo.pRes = createResDataBlock(pDescNode); int32_t code = TSDB_CODE_SUCCESS;
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; if (pInfo == NULL || pOperator == NULL) {
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024);
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = SArray* pColMatchColInfo =
@ -737,6 +741,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock; pInfo->pInputBlock = pInputBlock;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pOperator->name = "MultiwayMerge"; pOperator->name = "MultiwayMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false; pOperator->blocking = false;
@ -744,15 +751,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo); destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
int32_t code = appendDownstream(pOperator, downStreams, numStreams); code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }

View File

@ -251,6 +251,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK; terrno = TSDB_CODE_NO_AVAIL_DISK;
printf("tHash Init failed since %s", terrstr(terrno)); printf("tHash Init failed since %s", terrstr(terrno));
taosMemoryFree(pHashObj);
return NULL; return NULL;
} }

View File

@ -140,6 +140,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
int32_t* sourceId, SArray* pPageIdList) { int32_t* sourceId, SArray* pPageIdList) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) { if (pSource == NULL) {
taosArrayDestroy(pPageIdList);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
@ -155,6 +156,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
int32_t numOfRows = int32_t numOfRows =
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize; (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
ASSERT(numOfRows > 0); ASSERT(numOfRows > 0);
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
} }
@ -224,6 +226,22 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
int32_t code = 0; int32_t code = 0;
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Sort compare init failed since %s", terrstr(code));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
@ -245,22 +263,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} }
} else { } else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
code = terrno;
qError("Sort compare init failed since %s", terrstr(terrno));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param); pSource->src.pBlock = pHandle->fetchfp(pSource->param);
@ -507,12 +509,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code; return code;
} }
code = code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code; return code;
} }
@ -520,12 +524,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
while (1) { while (1) {
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
break; break;
} }
int32_t pageId = -1; int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId); void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) { if (pPage == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno; return terrno;
} }

View File

@ -1178,9 +1178,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
int64_t factor = int64_t factor = TSDB_TICK_PER_SECOND(timePrec);
(timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
int64_t unit = timeUnit * 1000 / factor; int64_t unit = timeUnit * 1000 / factor;
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
@ -1372,9 +1370,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
} }
int64_t factor = int64_t factor = TSDB_TICK_PER_SECOND(timePrec);
(timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
if (pInput[i].numOfRows > numOfRows) { if (pInput[i].numOfRows > numOfRows) {

View File

@ -1426,57 +1426,6 @@ void vectorAssign(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows; pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows;
} }
void vectorConcat(SScalarParam *pLeft, SScalarParam *pRight, void *out, int32_t _ord) {
#if 0
int32_t len = pLeft->bytes + pRight->bytes;
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
char *output = (char *)out;
if (pLeft->numOfRows == pRight->numOfRows) {
for (; i < pRight->numOfRows && i >= 0; i += step, output += len) {
char* left = POINTER_SHIFT(pLeft->data, pLeft->bytes * i);
char* right = POINTER_SHIFT(pRight->data, pRight->bytes * i);
if (isNull(left, pLeftCol->info.type) || isNull(right, pRight->info.type)) {
setVardataNull(output, TSDB_DATA_TYPE_BINARY);
continue;
}
// todo define a macro
memcpy(varDataVal(output), varDataVal(left), varDataLen(left));
memcpy(varDataVal(output) + varDataLen(left), varDataVal(right), varDataLen(right));
varDataSetLen(output, varDataLen(left) + varDataLen(right));
}
} else if (pLeft->numOfRows == 1) {
for (; i >= 0 && i < pRight->numOfRows; i += step, output += len) {
char *right = POINTER_SHIFT(pRight->data, pRight->bytes * i);
if (isNull(pLeft->data, pLeftCol->info.type) || isNull(right, pRight->info.type)) {
setVardataNull(output, TSDB_DATA_TYPE_BINARY);
continue;
}
memcpy(varDataVal(output), varDataVal(pLeft->data), varDataLen(pLeft->data));
memcpy(varDataVal(output) + varDataLen(pLeft->data), varDataVal(right), varDataLen(right));
varDataSetLen(output, varDataLen(pLeft->data) + varDataLen(right));
}
} else if (pRight->numOfRows == 1) {
for (; i >= 0 && i < pLeft->numOfRows; i += step, output += len) {
char* left = POINTER_SHIFT(pLeft->data, pLeft->bytes * i);
if (isNull(left, pLeftCol->info.type) || isNull(pRight->data, pRight->info.type)) {
SET_DOUBLE_NULL(output);
continue;
}
memcpy(varDataVal(output), varDataVal(left), varDataLen(pRight->data));
memcpy(varDataVal(output) + varDataLen(left), varDataVal(pRight->data), varDataLen(pRight->data));
varDataSetLen(output, varDataLen(left) + varDataLen(pRight->data));
}
}
#endif
}
static void vectorBitAndHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pRightCol, SColumnInfoData *pOutputCol, static void vectorBitAndHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pRightCol, SColumnInfoData *pOutputCol,
int32_t numOfRows, int32_t step, int32_t i) { int32_t numOfRows, int32_t step, int32_t i) {
_getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type);