refactor(query): do some internal refactor and add some logs.
This commit is contained in:
parent
fc7887e6b7
commit
abfa6d9162
|
@ -233,7 +233,6 @@ struct SVnodeCfg {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY lastKey;
|
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
|
@ -270,7 +270,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
|
STableKeyInfo info = {uid = id};
|
||||||
taosArrayPush(list, &info);
|
taosArrayPush(list, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -324,7 +324,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
@ -338,7 +338,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
|
STableKeyInfo info = {.uid = tableUid, .groupId = 0};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
||||||
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
|
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||||
|
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -189,7 +189,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
|
||||||
|
|
||||||
if (pScanInfo->pTagCond != NULL) {
|
if (pScanInfo->pTagCond != NULL) {
|
||||||
bool qualified = false;
|
bool qualified = false;
|
||||||
STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid, .lastKey = 0};
|
STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
|
||||||
code = isTableOk(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
|
code = isTableOk(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
|
qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
|
||||||
|
@ -201,9 +201,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
|
|
||||||
// handle multiple partition
|
// handle multiple partition
|
||||||
|
|
||||||
taosArrayPush(qa, id);
|
taosArrayPush(qa, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,6 +225,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
|
|
||||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add to qTaskInfo
|
||||||
|
// todo refactor STableList
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
||||||
|
uint64_t* uid = taosArrayGet(qa, i);
|
||||||
|
|
||||||
|
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||||
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(qa);
|
taosArrayDestroy(qa);
|
||||||
} else { // remove the table id in current list
|
} else { // remove the table id in current list
|
||||||
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
||||||
|
|
|
@ -4455,7 +4455,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
|
STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0};
|
||||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2464,9 +2464,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult
|
||||||
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
|
||||||
|
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
int32_t type = pCol->info.type;
|
int32_t type = pCol->info.type;
|
||||||
|
@ -2499,6 +2497,9 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
GET_TYPED_DATA(v, double, type, data);
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
tHistogramAdd(&pInfo->pHisto, v);
|
tHistogramAdd(&pInfo->pHisto, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("add %d elements into histogram, total:%d, numOfEntry:%d, %p", numOfElems, pInfo->pHisto->numOfElems,
|
||||||
|
pInfo->pHisto->numOfEntries, pInfo->pHisto);
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
@ -2537,11 +2538,19 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
|
||||||
if (pHisto->numOfElems <= 0) {
|
if (pHisto->numOfElems <= 0) {
|
||||||
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||||
|
|
||||||
|
qDebug("merge histo, total:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries, pHisto);
|
||||||
} else {
|
} else {
|
||||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||||
|
qDebug("input histogram, elem:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries,
|
||||||
|
pInput->pHisto);
|
||||||
|
|
||||||
SHistogramInfo* pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
|
SHistogramInfo* pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
|
||||||
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||||
|
|
||||||
|
qDebug("merge histo, total:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries,
|
||||||
|
pHisto);
|
||||||
tHistogramDestroy(&pRes);
|
tHistogramDestroy(&pRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2557,14 +2566,18 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
qDebug("total %d rows will merge, %p", pInput->numOfRows, pInfo->pHisto);
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
SAPercentileInfo* pInputInfo = (SAPercentileInfo*)varDataVal(data);
|
SAPercentileInfo* pInputInfo = (SAPercentileInfo*)varDataVal(data);
|
||||||
apercentileTransferInfo(pInputInfo, pInfo);
|
apercentileTransferInfo(pInputInfo, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("after merge, total:%d, numOfEntry:%d, %p", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto);
|
||||||
|
|
||||||
SET_VAL(pResInfo, 1, 1);
|
SET_VAL(pResInfo, 1, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2582,6 +2595,8 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pHisto->numOfElems > 0) {
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
|
qDebug("get the final res:%d, elements:%"PRId64", entry:%d", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries);
|
||||||
|
|
||||||
double ratio[] = {pInfo->percent};
|
double ratio[] = {pInfo->percent};
|
||||||
double* res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
double* res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
||||||
pInfo->result = *res;
|
pInfo->result = *res;
|
||||||
|
@ -2635,6 +2650,9 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
ASSERT(pDBuf->algo == pSBuf->algo);
|
ASSERT(pDBuf->algo == pSBuf->algo);
|
||||||
|
|
||||||
|
qDebug("start to combine apercentile, %p", pDBuf->pHisto);
|
||||||
|
|
||||||
apercentileTransferInfo(pSBuf, pDBuf);
|
apercentileTransferInfo(pSBuf, pDBuf);
|
||||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue