fix:add log
This commit is contained in:
parent
e2f4a2a03c
commit
873546bee5
|
@ -118,7 +118,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
|
|||
p->groupId = *(uint64_t*) key;
|
||||
p->pos = *(SResultRowPosition*) pData;
|
||||
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
|
||||
|
||||
qDebug("page_groupRes, groupId:%"PRIu64",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
|
||||
taosArrayPush(pGroupResInfo->pRows, &p);
|
||||
}
|
||||
|
||||
|
@ -608,6 +608,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
|||
}
|
||||
}
|
||||
|
||||
qDebug("page_setSelect num:%d", num);
|
||||
if (p != NULL) {
|
||||
p->subsidiaries.pCtx = pValCtx;
|
||||
p->subsidiaries.num = num;
|
||||
|
@ -664,7 +665,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
|
||||
pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||
// for simple column, the result buffer needs to hold at least one element.
|
||||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
|
||||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes + sizeof(bool); // sizeof(bool) marks if data is null
|
||||
}
|
||||
|
||||
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
||||
|
|
|
@ -275,6 +275,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
// 1. close current opened time window
|
||||
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
|
||||
pResult->offset != pResultRowInfo->cur.offset))) {
|
||||
qDebug("page_1");
|
||||
SResultRowPosition pos = pResultRowInfo->cur;
|
||||
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||
releaseBufPage(pResultBuf, pPage);
|
||||
|
@ -282,6 +283,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// allocate a new buffer page
|
||||
if (pResult == NULL) {
|
||||
qDebug("page_2");
|
||||
ASSERT(pSup->resultRowSize > 0);
|
||||
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
|
||||
|
||||
|
@ -539,7 +541,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
|
|||
if (pCtx[k].fpSet.process == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
qDebug("page_process");
|
||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
|
@ -1415,6 +1417,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
|
|||
return;
|
||||
}
|
||||
|
||||
qDebug("page_setbuf, groupId:%"PRIu64, groupId);
|
||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||
|
||||
// record the current active group id
|
||||
|
@ -1490,10 +1493,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
int32_t numOfExprs) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t start = pGroupResInfo->index;
|
||||
qDebug("\npage_copytoblock rows:%d", numOfRows);
|
||||
|
||||
for (int32_t i = start; i < numOfRows; i += 1) {
|
||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
|
||||
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||
|
||||
|
@ -1526,6 +1531,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
|
||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
|
||||
if (pCtx[j].fpSet.finalize) {
|
||||
qDebug("\npage_finalize %d", numOfExprs);
|
||||
|
||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
|
@ -1554,9 +1561,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
|
||||
releaseBufPage(pBuf, page);
|
||||
pBlock->info.rows += pRow->numOfRows;
|
||||
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||
break;
|
||||
}
|
||||
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
|
||||
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||
|
@ -3417,11 +3424,12 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
|
|||
}
|
||||
|
||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||
ASSERT(numOfRows != 0);
|
||||
pOperator->resultInfo.capacity = numOfRows;
|
||||
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
||||
|
||||
if (pOperator->resultInfo.threshold == 0) {
|
||||
pOperator->resultInfo.capacity = numOfRows;
|
||||
pOperator->resultInfo.threshold = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2051,6 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
|||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort;
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "thistogram.h"
|
||||
#include "tpercentile.h"
|
||||
#include "query.h"
|
||||
|
||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||
#define MAVG_MAX_POINTS_NUM 1000
|
||||
|
@ -1458,7 +1459,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
int32_t offset = pTuplePos->offset;
|
||||
|
||||
if (pTuplePos->pageId != -1) {
|
||||
int32_t numOfCols = taosArrayGetSize(pCtx->pSrcBlock->pDataBlock);
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + offset);
|
||||
|
@ -1469,21 +1470,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
int32_t ps = 0;
|
||||
for (int32_t k = 0; k < srcSlotId; ++k) {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||
ps += pSrcCol->info.bytes;
|
||||
}
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
if (nullList[srcSlotId]) {
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
if (nullList[j]) {
|
||||
colDataAppendNULL(pDstCol, rowIndex);
|
||||
} else {
|
||||
colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
|
||||
colDataAppend(pDstCol, rowIndex, pStart, false);
|
||||
}
|
||||
pStart += pDstCol->info.bytes;
|
||||
}
|
||||
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
|
@ -2407,7 +2405,7 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
|
||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(bool); // sizeof(bool) marks if data is null
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3033,6 +3031,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
// save the data of this tuple
|
||||
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
|
||||
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
|
@ -3051,6 +3050,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
|
||||
// save the data of this tuple by over writing the old data
|
||||
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||
topBotResComparFn, NULL, !isTopQuery);
|
||||
}
|
||||
|
@ -3060,7 +3060,11 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = NULL;
|
||||
|
||||
int32_t completeRowSize = pSrcBlock->info.rowSize + (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(bool);
|
||||
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
completeRowSize += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
if (pCtx->curBufPage == -1) {
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
|
@ -3078,19 +3082,22 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
// keep the current row data, extract method
|
||||
int32_t offset = 0;
|
||||
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock));
|
||||
for (int32_t i = 0; i < (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||
if (isNull) {
|
||||
nullList[i] = true;
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num);
|
||||
for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
offset += pCol->info.bytes;
|
||||
continue;
|
||||
}
|
||||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
@ -3108,14 +3115,18 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
offset += pCol->info.bytes;
|
||||
continue;
|
||||
|
@ -3123,7 +3134,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
@ -3154,7 +3165,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
} else {
|
||||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||
}
|
||||
|
||||
qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
|
|
@ -440,6 +440,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
|
|||
}
|
||||
|
||||
((void**)pi->pData)[0] = pi;
|
||||
uDebug("page_getNewBufPage pageId:%d, offset:%"PRId64, pi->pageId, pi->offset);
|
||||
return (void*)(GET_DATA_PAYLOAD(pi));
|
||||
}
|
||||
|
||||
|
@ -463,6 +464,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
lruListMoveToFront(pBuf->lruList, (*pi));
|
||||
(*pi)->used = true;
|
||||
|
||||
uDebug("page_getBufPage1 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
} else { // not in memory
|
||||
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||
|
@ -494,7 +496,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
uDebug("page_getBufPage2 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
}
|
||||
}
|
||||
|
@ -506,8 +508,10 @@ void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
|
|||
}
|
||||
|
||||
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
|
||||
assert(pi->pData != NULL && pi->used == true);
|
||||
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
|
||||
|
||||
assert(pi->pData != NULL && pi->used == true);
|
||||
// assert(pi->pData != NULL);
|
||||
pi->used = false;
|
||||
pBuf->statis.releasePages += 1;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,10 @@ class TDTestCase:
|
|||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
# tdSql.prepare()
|
||||
tdSql.execute('drop database if exists db')
|
||||
tdSql.execute('create database db vgroups 1')
|
||||
tdSql.execute('use db')
|
||||
print("============== STEP 1 ===== prepare data & validate json string")
|
||||
tdSql.error("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json, tagint int)")
|
||||
tdSql.error("create table if not exists jsons1(ts timestamp, data json) tags(tagint int)")
|
||||
|
|
Loading…
Reference in New Issue