Merge pull request #17976 from taosdata/feature/3_liaohj
enh(query): optimize the query perf.
This commit is contained in:
commit
0d4239af46
|
@ -130,7 +130,7 @@ void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn)
|
||||||
void tColDataClear(SColData *pColData);
|
void tColDataClear(SColData *pColData);
|
||||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
||||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal);
|
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
|
||||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||||
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
|
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
|
||||||
|
|
||||||
|
|
|
@ -1557,7 +1557,7 @@ void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
|
||||||
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal) {
|
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) {
|
||||||
uint8_t v;
|
uint8_t v;
|
||||||
switch (pColData->flag) {
|
switch (pColData->flag) {
|
||||||
case HAS_NONE:
|
case HAS_NONE:
|
||||||
|
|
|
@ -893,7 +893,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
||||||
// NOTE: reverse the order to find the end position in data block
|
// NOTE: reverse the order to find the end position in data block
|
||||||
int32_t endPos = -1;
|
int32_t endPos = -1;
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
|
@ -910,6 +910,117 @@ int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SData
|
||||||
return endPos;
|
return endPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
|
||||||
|
int32_t dumpedRows, bool asc) {
|
||||||
|
if (asc) {
|
||||||
|
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
|
||||||
|
} else {
|
||||||
|
int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
|
||||||
|
memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));
|
||||||
|
|
||||||
|
// todo: opt perf by extract the loop
|
||||||
|
// reverse the array list
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int64_t* pts = (int64_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a faster version of copy procedure.
|
||||||
|
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
|
||||||
|
int32_t dumpedRows, bool asc) {
|
||||||
|
uint8_t* p = NULL;
|
||||||
|
if (asc) {
|
||||||
|
p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
|
||||||
|
} else {
|
||||||
|
int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
|
||||||
|
p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t step = asc? 1:-1;
|
||||||
|
|
||||||
|
// make sure it is aligned to 8bit
|
||||||
|
ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
|
||||||
|
|
||||||
|
// 1. copy data in a batch model
|
||||||
|
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
|
||||||
|
|
||||||
|
// 2. reverse the array list in case of descending order scan data block
|
||||||
|
if (!asc) {
|
||||||
|
switch(pColData->info.type) {
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
{
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int64_t* pts = (int64_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int8_t* pts = (int8_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int16_t* pts = (int16_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int32_t* pts = (int32_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. if the null value exists, check items one-by-one
|
||||||
|
if (pData->flag != HAS_VALUE) {
|
||||||
|
int32_t rowIndex = 0;
|
||||||
|
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
|
||||||
|
uint8_t v = tColDataGetBitValue(pData, j);
|
||||||
|
if (v == 0 || v == 1) {
|
||||||
|
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
||||||
|
pColData->hasNull = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
@ -949,24 +1060,17 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
}
|
}
|
||||||
|
|
||||||
endIndex += step;
|
endIndex += step;
|
||||||
int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||||
if (remain > pReader->capacity) { // output buffer check
|
if (dumpedRows > pReader->capacity) { // output buffer check
|
||||||
remain = pReader->capacity;
|
dumpedRows = pReader->capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
int32_t rowIndex = 0;
|
int32_t rowIndex = 0;
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
if (asc) {
|
copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
|
||||||
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
|
|
||||||
} else {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
|
||||||
colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -981,23 +1085,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
} else if (pData->cid == pColData->info.colId) {
|
} else if (pData->cid == pColData->info.colId) {
|
||||||
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
|
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
|
if (IS_MATHABLE_TYPE(pColData->info.type)) {
|
||||||
uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
|
copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
|
||||||
memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
|
} else { // varchar/nchar type
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
|
||||||
// null value exists, check one-by-one
|
|
||||||
if (pData->flag != HAS_VALUE) {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
|
|
||||||
uint8_t v = tColDataGetBitValue(pData, j);
|
|
||||||
if (v == 0 || v == 1) {
|
|
||||||
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
|
||||||
tColDataGetValue(pData, j, &cv);
|
tColDataGetValue(pData, j, &cv);
|
||||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||||
}
|
}
|
||||||
|
@ -1007,7 +1100,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
i += 1;
|
i += 1;
|
||||||
} else { // the specified column does not exist in file block, fill with null data
|
} else { // the specified column does not exist in file block, fill with null data
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1015,12 +1108,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
// fill the mis-matched columns with null value
|
// fill the mis-matched columns with null value
|
||||||
while (i < numOfOutputCols) {
|
while (i < numOfOutputCols) {
|
||||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pResBlock->info.rows = remain;
|
pResBlock->info.rows = dumpedRows;
|
||||||
pDumpInfo->rowIndex += step * remain;
|
pDumpInfo->rowIndex += step * dumpedRows;
|
||||||
|
|
||||||
// check if current block are all handled
|
// check if current block are all handled
|
||||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
||||||
|
@ -1039,7 +1132,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||||
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
|
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
|
||||||
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -264,6 +264,7 @@ typedef struct SExchangeInfo {
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
uint64_t self;
|
uint64_t self;
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
|
int64_t openedTs; // start exec time stamp
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
typedef struct SScanInfo {
|
typedef struct SScanInfo {
|
||||||
|
|
|
@ -1846,40 +1846,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
static int32_t getCompletedSources(const SArray* pArray) {
|
||||||
int32_t code = 0;
|
size_t total = taosArrayGetSize(pArray);
|
||||||
int64_t startTs = taosGetTimestampUs();
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
|
||||||
|
|
||||||
int32_t completed = 0;
|
int32_t completed = 0;
|
||||||
for (int32_t k = 0; k < totalSources; ++k) {
|
for (int32_t k = 0; k < total; ++k) {
|
||||||
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
SSourceDataInfo* p = taosArrayGet(pArray, k);
|
||||||
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
completed += 1;
|
completed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
|
||||||
|
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||||
if (completed == totalSources) {
|
if (completed == totalSources) {
|
||||||
setAllSourcesCompleted(pOperator, startTs);
|
setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// printf("1\n");
|
|
||||||
tsem_wait(&pExchangeInfo->ready);
|
tsem_wait(&pExchangeInfo->ready);
|
||||||
// printf("2\n");
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < totalSources; ++i) {
|
for (int32_t i = 0; i < totalSources; ++i) {
|
||||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
||||||
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
// printf("========:%d is completed\n", i);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// printf("index:%d - status:%d\n", i, pDataInfo->status);
|
|
||||||
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
|
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
|
||||||
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1895,27 +1896,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
// printf("%d completed, try next\n", i);
|
|
||||||
|
|
||||||
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
||||||
pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources);
|
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
|
||||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||||
|
break;
|
||||||
// if (completed == totalSources) {
|
|
||||||
// return;
|
|
||||||
// } else {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
char* pStart = pRetrieveRsp->data;
|
char* pStart = pRetrieveRsp->data;
|
||||||
while (index++ < pRetrieveRsp->numOfBlocks) {
|
while (index++ < pRetrieveRsp->numOfBlocks) {
|
||||||
printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks);
|
|
||||||
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
||||||
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1926,25 +1918,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
|
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator);
|
||||||
|
|
||||||
// int32_t completed = 0;
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
|
|
||||||
// for (int32_t k = 0; k < totalSources; ++k) {
|
|
||||||
// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
|
||||||
// if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
|
||||||
// completed += 1;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||||
", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
|
", total:%.2f Kb, try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
||||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
||||||
completed, i + 1, totalSources);
|
i + 1, totalSources);
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||||
|
@ -1962,23 +1945,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (completed == totalSources) {
|
|
||||||
// setAllSourcesCompleted(pOperator, startTs);
|
|
||||||
// }
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
} // end loop
|
||||||
|
|
||||||
int32_t completed = 0;
|
int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||||
for (int32_t k = 0; k < totalSources; ++k) {
|
if (complete1 == totalSources) {
|
||||||
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
|
||||||
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
|
||||||
completed += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (completed == totalSources) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2097,6 +2069,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
pExchangeInfo->openedTs = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
|
|
@ -911,6 +911,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
float* plist = (float*)pCol->pData;
|
float* plist = (float*)pCol->pData;
|
||||||
|
// float val = 0;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -918,8 +919,9 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pAvgRes->count += 1;
|
pAvgRes->count += 1;
|
||||||
pAvgRes->sum.dsum += plist[i];
|
pAvgRes->sum.dsum += plist[i];
|
||||||
}
|
}
|
||||||
|
// pAvgRes->sum.dsum = val;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1278,14 +1280,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1309,14 +1317,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1340,14 +1354,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1371,14 +1391,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1404,14 +1430,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1435,14 +1467,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1466,14 +1504,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1497,14 +1541,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1529,14 +1579,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
*val = pData[i];
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
}
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1559,7 +1615,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
#if 0
|
||||||
if ((*val) == pData[i]) {
|
if ((*val) == pData[i]) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1570,6 +1626,23 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
|
if (isMinFunc) { // min
|
||||||
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -2934,6 +3007,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
int64_t* pts = (int64_t*) pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2942,13 +3016,14 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
||||||
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (numOfElems == 0) {
|
if (numOfElems == 0) {
|
||||||
// save selectivity value for column consisted of all null values
|
// save selectivity value for column consisted of all null values
|
||||||
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
|
@ -3020,6 +3095,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -3028,15 +3104,16 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
doSaveCurrentVal(pCtx, i, cts, type, data);
|
doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// save selectivity value for column consisted of all null values
|
||||||
if (numOfElems == 0) {
|
if (numOfElems == 0) {
|
||||||
// save selectivity value for column consisted of all null values
|
|
||||||
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
}
|
}
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
@ -3216,11 +3293,13 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
|
||||||
|
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
numOfElems++;
|
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
|
|
Loading…
Reference in New Issue