From 1c46958cb4813dad693b066d42838c1262c505b8 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 5 Jun 2022 19:55:06 +0800 Subject: [PATCH] enh: check max/sum/avg func for rsma --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 38 +++++++++++++--------- source/dnode/vnode/src/sma/smaRollup.c | 8 +++-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index cce763cafe..bf16dd2fc4 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -232,7 +232,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); -void blockDebugShowData(const SArray* dataBlocks); +void blockDebugShowData(const SArray* dataBlocks, const char* flag); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 055992db53..909fee09aa 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1488,7 +1488,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } -void blockDebugShowData(const SArray* dataBlocks) { +void blockDebugShowData(const SArray* dataBlocks, const char* flag) { char pBuf[128] = {0}; int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { @@ -1496,7 +1496,7 @@ void blockDebugShowData(const SArray* dataBlocks) { int32_t colNum = pDataBlock->info.numOfCols; int32_t rows = pDataBlock->info.rows; for (int32_t j = 0; j < rows; j++) { - printf("|"); + printf("%s |", flag); for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); @@ -1521,8 +1521,11 @@ void blockDebugShowData(const SArray* dataBlocks) { case TSDB_DATA_TYPE_UBIGINT: printf(" %15lu |", *(uint64_t*)var); break; + case TSDB_DATA_TYPE_FLOAT: + printf(" %15f |", *(float*)var); + break; case TSDB_DATA_TYPE_DOUBLE: - printf(" %15f |", *(double*)var); + printf(" %15lf |", *(double*)var); break; } } @@ -1550,8 +1553,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks bufSize += sizeof(SSubmitBlk); } - ASSERT(bufSize < 3 * 1024 * 1024); - *pReq = taosMemoryCalloc(1, bufSize); if (!(*pReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1562,7 +1563,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t msgLen = sizeof(SSubmitReq); int32_t numOfBlks = 0; SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); // TODO: use the latest version + tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); @@ -1580,18 +1581,17 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks pSubmitBlk->uid = pDataBlock->info.groupId; pSubmitBlk->numOfRows = rows; - ++numOfBlks; - msgLen += sizeof(SSubmitBlk); int32_t dataLen = 0; for (int32_t j = 0; j < rows; ++j) { // iterate by row tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf - printf("|"); bool isStartKey = false; int32_t offset = 0; for (int32_t k = 0; k < colNum; ++k) { // iterate by column SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + STColumn* pCol = &pTSchema->columns[k]; + ASSERT(pCol->type == pColInfoData->info.type); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: if (!isStartKey) { @@ -1600,29 +1600,29 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks offset, k); } else { - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k); } break; case TSDB_DATA_TYPE_NCHAR: { - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k); break; } case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k); break; } case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_DECIMAL: case TSDB_DATA_TYPE_BLOB: case TSDB_DATA_TYPE_MEDIUMBLOB: - printf("the column type %" PRIi16 " is defined but not implemented yet\n", pColInfoData->info.type); + uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); TASSERT(0); break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { - tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k); } else { - printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); + uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); TASSERT(0); } break; @@ -1630,7 +1630,13 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks offset += TYPE_BYTES[pColInfoData->info.type]; } dataLen += TD_ROW_LEN(rb.pBuf); +#ifdef TD_DEBUG_PRINT_ROW + tdSRowPrint(rb.pBuf, pTSchema, __func__); +#endif } + + ++numOfBlks; + pSubmitBlk->dataLen = dataLen; msgLen += pSubmitBlk->dataLen; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 80c8d20572..e738d3a408 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -400,7 +400,11 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 } if (taosArrayGetSize(pResult) > 0) { - blockDebugShowData(pResult); +#if 1 + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, level); + blockDebugShowData(pResult, flag); +#endif STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { @@ -444,7 +448,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - // TODO: use the proper schema instead of 0, and cache STSchema in cache + // TODO: cache STSchema STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 350e723541..fa392baa16 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -297,8 +297,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext); #ifdef TD_DEBUG_PRINT_ROW - printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid, - SL_SIZE(pTbData->pData)); + printf("!!! %s:%d vgId:%d dir:%s table:%" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, + TD_VID(pTsdb->pVnode), pTsdb->dir, pTbData->uid, SL_SIZE(pTbData->pData)); #endif // Set statistics diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5f2ea80078..b50dc95ca1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1661,7 +1661,11 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa } #ifdef TD_DEBUG_PRINT_ROW - tdSRowPrint(row1, pSchema1, __func__); + char flags[70] = {0}; + STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo; + snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode), + pTsdb->dir, row1 ? "!" : "", row2 ? "!" : ""); + tdSRowPrint(row1, pSchema1, flags); #endif if (isRow1DataRow) {