From ddaae71391c7362ef01e0eb0720201076558542e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Mar 2022 13:11:06 +0800 Subject: [PATCH 01/17] [td-13039] support write in and retrieve from vnode. --- include/common/tdatablock.h | 4 +- source/client/inc/clientInt.h | 33 +++++--- source/client/src/clientImpl.c | 92 ++++++++++++++++------- source/client/test/clientTests.cpp | 27 ++++--- source/common/src/tdatablock.c | 30 ++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 7 +- source/libs/executor/src/dataDispatcher.c | 60 +++++++++------ source/libs/executor/src/executorimpl.c | 8 +- source/libs/executor/src/tsort.c | 6 +- source/libs/parser/src/parInsert.c | 8 +- source/libs/qworker/src/qworker.c | 9 +-- 11 files changed, 180 insertions(+), 104 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7e60013aa1..e3ff81be3a 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -106,7 +106,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); size_t blockDataGetSize(const SSDataBlock* pBlock); -size_t blockDataGetRowSize(const SSDataBlock* pBlock); +size_t blockDataGetRowSize(SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); @@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void blockDataClearup(SSDataBlock* pDataBlock); +void blockDataCleanup(SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 321e8ab77b..f5a370cf53 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -141,18 +141,27 @@ typedef struct STscObj { SAppInstInfo* pAppInfo; } STscObj; +typedef struct SResultColumn { + union { + char* nullbitmap; // bitmap, one bit for each item in the list + int32_t* offset; + }; + char* pData; +} SResultColumn; + typedef struct SReqResultInfo { - const char* pRspMsg; - const char* pData; - TAOS_FIELD* fields; - uint32_t numOfCols; - int32_t* length; - TAOS_ROW row; - char** pCol; - uint32_t numOfRows; - uint64_t totalRows; - uint32_t current; - bool completed; + const char* pRspMsg; + const char* pData; + TAOS_FIELD* fields; + uint32_t numOfCols; + int32_t* length; + TAOS_ROW row; + SResultColumn* pCol; + uint32_t numOfRows; + uint64_t totalRows; + uint32_t current; + bool completed; + int32_t payloadLen; } SReqResultInfo; typedef struct SShowReqInfo { @@ -224,7 +233,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 646b443fb7..96761facd0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -14,7 +14,7 @@ static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -483,13 +483,16 @@ void* doFetchRow(SRequestObj* pRequest) { } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); - if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; + pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); + if (pRequest->code != TSDB_CODE_SUCCESS) { + return NULL; + } + + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); + if (pRequest->code != TSDB_CODE_SUCCESS) { return NULL; } - setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); @@ -556,10 +559,23 @@ void* doFetchRow(SRequestObj* pRequest) { _return: for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { - pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; + SResultColumn* pCol = &pResultInfo->pCol[i]; + if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { - pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); - pResultInfo->row[i] = varDataVal(pResultInfo->row[i]); + if (pCol->offset[pResultInfo->current] != -1) { + char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData; + + pResultInfo->length[i] = varDataLen(pStart); + pResultInfo->row[i] = varDataVal(pStart); + } else { + pResultInfo->row[i] = NULL; + } + } else { + if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) { + pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current; + } else { + pResultInfo->row[i] = NULL; + } } } @@ -567,30 +583,52 @@ _return: return pResultInfo->row; } -static void doPrepareResPtr(SReqResultInfo* pResInfo) { +static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); } + + if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } else { + return TSDB_CODE_SUCCESS; + } } -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { - return; + return TSDB_CODE_SUCCESS; } - // todo check for the failure of malloc - doPrepareResPtr(pResultInfo); + int32_t code = doPrepareResPtr(pResultInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t offset = 0; + int32_t* colLength = (int32_t*)pResultInfo->pData; + char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { + colLength[i] = htonl(colLength[i]); + + if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { + pResultInfo->pCol[i].offset = (int32_t*)pStart; + pStart += numOfRows * sizeof(int32_t); + } else { + pResultInfo->pCol[i].nullbitmap = pStart; + pStart += BitmapLen(pResultInfo->numOfRows); + } + + pResultInfo->pCol[i].pData = pStart; pResultInfo->length[i] = pResultInfo->fields[i].bytes; - pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); - pResultInfo->pCol[i] = pResultInfo->row[i]; - offset += pResultInfo->fields[i].bytes; + pResultInfo->row[i] = pResultInfo->pCol[i].pData; + + pStart += colLength[i]; } + + return TSDB_CODE_SUCCESS; } char* getDbOfConnection(STscObj* pObj) { @@ -612,15 +650,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { pthread_mutex_unlock(&pTscObj->mutex); } -void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*)pRsp; - pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; - pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; + pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->current = 0; + pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->payloadLen = htonl(pRsp->compLen); + // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); + return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1a155beb90..7d03040efb 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -52,7 +52,7 @@ TEST(testCase, driverInit_Test) { // taosInitGlobalCfg(); // taos_init(); } - +#if 0 TEST(testCase, connect_Test) { // taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); @@ -648,6 +648,7 @@ TEST(testCase, projection_query_stables) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -656,18 +657,20 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + pRes = taos_query(pConn, "select * from tu"); - pRes = taos_query(pConn, "select count(*) from t_x_19"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } +// pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select count(*) from tu"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7e4f1d9025..93fe314c10 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -430,11 +430,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 /** * - * +------------------+---------------+--------------------+ - * |the number of rows| column length | column #1 | - * | (4 bytes) | (4 bytes) |--------------------+ - * | | | null bitmap| values| - * +------------------+---------------+--------------------+ + * +------------------+---------------------------------------------+ + * |the number of rows| column #1 | + * | (4 bytes) |------------+-----------------------+--------+ + * | | null bitmap| column length(4bytes) | values | + * +------------------+------------+-----------------------+--------+ * @param buf * @param pBlock * @return @@ -515,17 +515,21 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_SUCCESS; } -size_t blockDataGetRowSize(const SSDataBlock* pBlock) { +size_t blockDataGetRowSize(SSDataBlock* pBlock) { ASSERT(pBlock != NULL); - size_t rowSize = 0; + if (pBlock->info.rowSize == 0) { + size_t rowSize = 0; - size_t numOfCols = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - rowSize += pColInfo->info.bytes; + size_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + rowSize += pColInfo->info.bytes; + } + + pBlock->info.rowSize = rowSize; } - return rowSize; + return pBlock->info.rowSize; } /** @@ -1059,7 +1063,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // destroyTupleIndex(index); } -void blockDataClearup(SSDataBlock* pDataBlock) { +void blockDataCleanup(SSDataBlock* pDataBlock) { pDataBlock->info.rows = 0; if (pDataBlock->info.hasVarCol) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 97c52f44eb..572b572167 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -404,7 +404,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, colInfo.info = pCond->colList[i]; colInfo.pData = calloc(1, EXTRA_BYTES + pReadHandle->outputCapacity * pCond->colList[i].bytes); - if (colInfo.pData == NULL) { + + if (!IS_VAR_DATA_TYPE(colInfo.info.type)) { + colInfo.nullbitmap = calloc(1, BitmapLen(pReadHandle->outputCapacity)); + } + + if (colInfo.pData == NULL || (colInfo.nullbitmap == NULL && (!IS_VAR_DATA_TYPE(colInfo.info.type)))) { goto _end; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index a0ee048d82..e185c5444f 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -15,11 +15,12 @@ #include "dataSinkInt.h" #include "dataSinkMgt.h" +#include "executorimpl.h" #include "planner.h" #include "tcompression.h" #include "tglobal.h" #include "tqueue.h" -#include "executorimpl.h" +#include "tdatablock.h" typedef struct SDataDispatchBuf { int32_t useSize; @@ -64,29 +65,47 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc } static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { - int32_t colSize = pColRes->info.bytes * numOfRows; + int32_t colSize = colDataGetLength(pColRes, numOfRows); return (*(tDataTypes[pColRes->info.type].compFunc))( pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) { +static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) { int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); - int32_t *compSizes = (int32_t*)data; - if (compressed) { - data += numOfCols * sizeof(int32_t); - } + int32_t * colSizes = (int32_t*)data; + data += numOfCols * sizeof(int32_t); + *dataLen = (numOfCols * sizeof(int32_t)); + + int32_t numOfRows = pInput->pData->info.rows; for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); - if (compressed) { - compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); - data += compSizes[col]; - *compLen += compSizes[col]; - compSizes[col] = htonl(compSizes[col]); + + // copy the null bitmap + if (IS_VAR_DATA_TYPE(pColRes->info.type)) { + size_t metaSize = numOfRows * sizeof(int32_t); + memcpy(data, pColRes->varmeta.offset, metaSize); + data += metaSize; + (*dataLen) += metaSize; } else { - memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); - data += pColRes->info.bytes * pInput->pData->info.rows; + int32_t len = BitmapLen(numOfRows); + memcpy(data, pColRes->nullbitmap, len); + data += len; + (*dataLen) += len; } + + if (compressed) { + colSizes[col] = compressColData(pColRes, numOfRows, data, compressed); + data += colSizes[col]; + (*dataLen) += colSizes[col]; + } else { + colSizes[col] = colDataGetLength(pColRes, numOfRows); + (*dataLen) += colSizes[col]; + memmove(data, pColRes->pData, colSizes[col]); + data += colSizes[col]; + } + + colSizes[col] = htonl(colSizes[col]); } } @@ -95,16 +114,14 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); - pEntry->numOfRows = pInput->pData->info.rows; - pEntry->dataLen = 0; + pEntry->numOfRows = pInput->pData->info.rows; + pEntry->dataLen = 0; pBuf->useSize = sizeof(SRetrieveTableRsp); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); - if (0 == pEntry->compressed) { - pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; - } - pBuf->useSize += pEntry->dataLen; - // todo completed + + pEntry->dataLen = pEntry->dataLen; + pBuf->useSize += pEntry->dataLen; } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { @@ -169,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE *pLen = 0; return; } + SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 947fb08ff9..1127afe47b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3986,7 +3986,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - blockDataClearup(pBlock); + blockDataCleanup(pBlock); if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { return; } @@ -5794,7 +5794,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) { - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -5950,7 +5950,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { while(1) { - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { @@ -6366,7 +6366,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SSDataBlock* pRes = pInfo->pRes; - blockDataClearup(pRes); + blockDataCleanup(pRes); if (pProjectInfo->existDataBlock) { // TODO refactor // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 08bab762be..d424599758 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { start = stop + 1; } - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); @@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { - blockDataClearup(pHandle->pDataBlock); + blockDataCleanup(pHandle->pDataBlock); while(1) { if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { @@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); } tMergeTreeDestroy(pHandle->pMergeTree); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 43cc308483..5ff6e5f709 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -453,7 +453,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int return func(&tmpVal, pSchema->bytes, param); } - return func(getNullValue(pSchema->type), 0, param); + return func(NULL, 0, param); } switch (pSchema->type) { @@ -628,7 +628,11 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p varDataSetLen(rowEnd, output); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); } else { - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx); + if (value == NULL) { // it is a null data + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, true, pa->toffset, pa->colIdx); + } else { + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx); + } } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 42890ab38a..5f7d0ff827 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -603,28 +603,21 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase); - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + *rspMsg = rsp; *dataLen = 0; - return TSDB_CODE_SUCCESS; } pOutput->bufStatus = DS_BUF_EMPTY; - QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); - return TSDB_CODE_SUCCESS; } - // Got data from sink - *dataLen = len; - QW_TASK_DLOG("task got data in sink, dataLength:%d", len); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); From 554f12db50bf2252aa5a59b20210895a05ced4a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Mar 2022 13:42:49 +0800 Subject: [PATCH 02/17] [td-13039] fix bugs. --- source/common/src/tdatablock.c | 2 +- source/libs/executor/src/dataDispatcher.c | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9f03e179e3..e26f3ca128 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -570,7 +570,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { - rowSize += 1/8.0; + rowSize += 1/8.0; // one bit for each record } } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index e185c5444f..64b54ab135 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -109,8 +109,13 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema } } -// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... -// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... +// data format: +// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... +// | | sizeof(int32_t) * numOfCols | actual size | | actual size | | +// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// The length of bitmap is decided by number of rows of this data block, and the length of each column data is +// recorded in the first segment, next to the struct header static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); @@ -132,8 +137,11 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - // struct size + data payload + length for each column - pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t); + // NOTE: there are four bytes of an integer more than the required buffer space. + // struct size + data payload + length for each column + bitmap length + pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + + __ceill(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); + pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); From c503c00ff8b6f5efee3cd5a43eb231226700ee2e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Mar 2022 13:52:35 +0800 Subject: [PATCH 03/17] [td-13039] fix compiler error. --- source/libs/executor/src/dataDispatcher.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 64b54ab135..0e1fa5267d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -140,7 +140,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, // NOTE: there are four bytes of an integer more than the required buffer space. // struct size + data payload + length for each column + bitmap length pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + - __ceill(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); + ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { From 51a6e6355ff72724227090844c0b11606e409bcf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Mar 2022 14:33:31 +0800 Subject: [PATCH 04/17] [td-13039] refactor. --- include/common/ttypes.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 5aa22bdcad..43c1d082f2 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -46,7 +46,7 @@ typedef struct { #define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len)) -#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) +#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR)) #define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0])) #define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v)) From cf1e4507f3f5904b6c69bf07f980a1575a62b9f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 19 Mar 2022 23:12:58 +0800 Subject: [PATCH 05/17] [td-13039] refactor. --- source/libs/function/src/builtins.c | 70 +++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index edb0acf075..9f0a03b647 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -61,6 +61,76 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = maxFunction, .finalizeFunc = functionFinalizer }, + { + .name = "stddev", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "percentile", + .type = FUNCTION_TYPE_PERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "apercentile", + .type = FUNCTION_TYPE_APERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "top", + .type = FUNCTION_TYPE_TOP, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "bottom", + .type = FUNCTION_TYPE_BOTTOM, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "spread", + .type = FUNCTION_TYPE_SPREAD, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "last_row", + .type = FUNCTION_TYPE_LAST_ROW, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, From 84818987b3873a697dee8f959f03066e4ec6f77d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 21 Mar 2022 15:25:13 +0800 Subject: [PATCH 06/17] [td-13039] fix bug in show tables; --- include/libs/executor/executor.h | 6 ++- source/libs/executor/src/executor.c | 19 ++++--- source/libs/function/inc/builtinsimpl.h | 6 ++- source/libs/function/src/builtins.c | 24 ++++----- source/libs/function/src/builtinsimpl.c | 71 ++++++++++++++++++++++++- 5 files changed, 103 insertions(+), 23 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d4af51fc21..6091456780 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,6 +32,9 @@ typedef struct SReadHandle { void* meta; } SReadHandle; +#define STREAM_DATA_TYPE_SUBMITBLK 0x1u +#define STREAM_DATA_TYPE_SSDATABLK 0x2u + /** * Create the exec task for streaming mode * @param pMsg @@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); * Set the input data block for the stream scan. * @param tinfo * @param input + * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); /** * Update the table id list, add or remove. diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a8602b7c77..3720e1c179 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -18,7 +18,7 @@ #include "planner.h" #include "tq.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -31,18 +31,23 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) return TSDB_CODE_QRY_APP_ERROR; } - return doSetStreamBlock(pOperator->pDownstream[0], input, id); + // TODO handle the join case + return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info; - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("submit msg messed up when initing stream block, %s" PRIx64, id); - return TSDB_CODE_QRY_APP_ERROR; + if (type == STREAM_DATA_TYPE_SUBMITBLK) { + if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + qError("submit msg messed up when initing stream block, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + } else { // TODO + } return TSDB_CODE_SUCCESS; } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -53,7 +58,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7ba7d7bdcc..e67215dfa5 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -23,7 +23,7 @@ extern "C" { #include "function.h" bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -void functionFinalizer(SqlFunctionCtx *pCtx); +void functionFinalize(SqlFunctionCtx *pCtx); bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void countFunction(SqlFunctionCtx *pCtx); @@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); +bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void stddevFunction(SqlFunctionCtx* pCtx); +void stddevFinalize(SqlFunctionCtx* pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 9f0a03b647..15f8ca2d10 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCountFuncEnv, .initFunc = functionSetup, .processFunc = countFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "sum", @@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSumFuncEnv, .initFunc = functionSetup, .processFunc = sumFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "min", @@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minFunctionSetup, .processFunc = minFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "max", @@ -59,17 +59,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "stddev", .type = FUNCTION_TYPE_STDDEV, .classification = FUNC_MGT_AGG_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, + .getEnvFunc = getStddevFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "percentile", @@ -79,7 +79,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "apercentile", @@ -89,7 +89,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "top", @@ -99,7 +99,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "bottom", @@ -109,7 +109,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "spread", @@ -119,7 +119,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "last_row", @@ -129,7 +129,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "concat", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index f0f00434f0..fd3c022739 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); } -void functionFinalizer(SqlFunctionCtx *pCtx) { +void functionFinalize(SqlFunctionCtx *pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); doFinalizer(pResInfo); } @@ -441,4 +441,71 @@ void minFunction(SqlFunctionCtx *pCtx) { void maxFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 0); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); -} \ No newline at end of file +} + +typedef struct STopBotRes { + int32_t num; +} STopBotRes; + +bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); + int32_t bytes = pColNode->node.resType.bytes; + SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); + return true; +} + +typedef struct SStddevRes { + int64_t count; + union {double quadraticDSum; int64_t quadraticISum;}; + union {double dsum; int64_t isum;}; +} SStddevRes; + +bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SStddevRes); + return true; +} + +void stddevFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; + int32_t type = pInput->pData[0]->info.type; + + SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + +// } else { // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch(type) { + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + } + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); +} + +void stddevFinalize(SqlFunctionCtx* pCtx) { + functionFinalize(pCtx); + + SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count); +} + + From 0081c145aedc65b2a576d0acc3f36f0e1fd25e7a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 22 Mar 2022 13:57:50 +0800 Subject: [PATCH 07/17] rename vnodes to vnode --- source/dnode/mgmt/vnode/src/vmInt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 1c6c7d089e..c6122c0501 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -342,7 +342,7 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.requiredFp = vmRequire; vmInitMsgHandles(pWrapper); - pWrapper->name = "vnodes"; + pWrapper->name = "vnode"; pWrapper->fp = mgmtFp; } From 3cdeed0d447bc8bee039cdc62fc912b7d7fe1db9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 22 Mar 2022 13:59:48 +0800 Subject: [PATCH 08/17] rename TDB to TSDB for log --- source/dnode/vnode/src/inc/tsdbLog.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdbLog.h b/source/dnode/vnode/src/inc/tsdbLog.h index 56dc8ab2a0..3afe628198 100644 --- a/source/dnode/vnode/src/inc/tsdbLog.h +++ b/source/dnode/vnode/src/inc/tsdbLog.h @@ -24,12 +24,12 @@ extern "C" { extern int32_t tsdbDebugFlag; -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } From eb5471fdd62b0d349b58efea27e1139181a8aa7f Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 22 Mar 2022 14:57:16 +0800 Subject: [PATCH 09/17] tsdb merge --- include/common/trow.h | 8 ++++++-- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/include/common/trow.h b/include/common/trow.h index 47edf6f1ad..4ae5a2277d 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -968,10 +968,14 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in #endif return TSDB_CODE_SUCCESS; } - if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) { + + if (TD_COL_ROWS_NORM(pCol)) { + pVal->valType = TD_VTYPE_NORM; + } else if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) { return terrno; } - if (TD_COL_ROWS_NORM(pCol) || tdValTypeIsNorm(pVal->valType)) { + + if (tdValTypeIsNorm(pVal->valType)) { if (IS_VAR_DATA_TYPE(pCol->type)) { pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index f6827eaae1..9619ac036e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -727,9 +727,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * } ASSERT(pBlockCol->colId == pDataCol->colId); - // set the bitmap - pDataCol->bitmap = pBlockCol->bitmap; } + // set the bitmap + pDataCol->bitmap = pBlockCol->bitmap; if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1; } From 1a1008548eadc5ab8686976f1fb7eeeadf14aa94 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 22 Mar 2022 15:17:45 +0800 Subject: [PATCH 10/17] trigger CI From 2dc845fba9856c24263190fa628e9f27781fd4c7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 22 Mar 2022 15:32:36 +0800 Subject: [PATCH 11/17] restore --- source/dnode/mgmt/vnode/src/vmInt.c | 2 +- source/dnode/vnode/src/inc/tsdbLog.h | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index c6122c0501..1c6c7d089e 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -342,7 +342,7 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.requiredFp = vmRequire; vmInitMsgHandles(pWrapper); - pWrapper->name = "vnode"; + pWrapper->name = "vnodes"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/vnode/src/inc/tsdbLog.h b/source/dnode/vnode/src/inc/tsdbLog.h index 3afe628198..56dc8ab2a0 100644 --- a/source/dnode/vnode/src/inc/tsdbLog.h +++ b/source/dnode/vnode/src/inc/tsdbLog.h @@ -24,12 +24,12 @@ extern "C" { extern int32_t tsdbDebugFlag; -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } From cf95e5a7697a726b033335f5e635a7766016b44d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 22 Mar 2022 23:33:03 +0800 Subject: [PATCH 12/17] [td-13039] update test. --- source/client/test/clientTests.cpp | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1b2c955946..ac092c8f10 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -660,20 +660,12 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select * from tu"); - -// pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select count(*) from tu"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } + pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); From 636edd5568067de3a838d17ab75920234d9882d1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 22 Mar 2022 23:33:53 +0800 Subject: [PATCH 13/17] [td-13039] update test. --- tools/shell/src/shellMain.c | 671 ------------------------------------ 1 file changed, 671 deletions(-) delete mode 100644 tools/shell/src/shellMain.c diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c deleted file mode 100644 index 78d6f74df1..0000000000 --- a/tools/shell/src/shellMain.c +++ /dev/null @@ -1,671 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define __USE_XOPEN -#include "os.h" -#include "shell.h" -#include "tglobal.h" -#include "shellCommand.h" -#include "tbase64.h" -#include "tlog.h" -#include "version.h" - -#include -#include -#include - -#define OPT_ABORT 1 /* abort */ - - -int indicator = 1; - -void insertChar(Command *cmd, char *c, int size); -const char *argp_program_version = version; -const char *argp_program_bug_address = ""; -static char doc[] = ""; -static char args_doc[] = ""; - -TdThread pid; -static tsem_t cancelSem; - -static struct argp_option options[] = { - {"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."}, - {"password", 'p', 0, 0, "The password to use when connecting to the server."}, - {"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."}, - {"user", 'u', "USER", 0, "The user name to use when connecting to the server."}, - {"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."}, - {"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."}, - {"dump-config", 'C', 0, 0, "Dump configuration."}, - {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, - {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, - {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, - {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, - {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, - {"check", 'k', "CHECK", 0, "Check tables."}, - {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, - {"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."}, - {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."}, - {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, - {"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."}, - {"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."}, - {0}}; - -static error_t parse_opt(int key, char *arg, struct argp_state *state) { - /* Get the input argument from argp_parse, which we - know is a pointer to our arguments structure. */ - SShellArguments *arguments = state->input; - wordexp_t full_path; - - switch (key) { - case 'h': - arguments->host = arg; - break; - case 'p': - break; - case 'P': - if (arg) { - arguments->port = atoi(arg); - } else { - fprintf(stderr, "Invalid port\n"); - return -1; - } - - break; - case 'z': - arguments->timezone = arg; - break; - case 'u': - arguments->user = arg; - break; - case 'A': - arguments->auth = arg; - break; - case 'c': - if (wordexp(arg, &full_path, 0) != 0) { - fprintf(stderr, "Invalid path %s\n", arg); - return -1; - } - if (strlen(full_path.we_wordv[0]) >= TSDB_FILENAME_LEN) { - fprintf(stderr, "config file path: %s overflow max len %d\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1); - wordfree(&full_path); - return -1; - } - tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN); - wordfree(&full_path); - break; - case 'C': - arguments->dump_config = true; - break; - case 's': - arguments->commands = arg; - break; - case 'r': - arguments->is_raw_time = true; - break; - case 'f': - if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) { - fprintf(stderr, "Invalid path %s\n", arg); - return -1; - } - tstrncpy(arguments->file, full_path.we_wordv[0], TSDB_FILENAME_LEN); - wordfree(&full_path); - break; - case 'D': - if (wordexp(arg, &full_path, 0) != 0) { - fprintf(stderr, "Invalid path %s\n", arg); - return -1; - } - tstrncpy(arguments->dir, full_path.we_wordv[0], TSDB_FILENAME_LEN); - wordfree(&full_path); - break; - case 'T': - if (arg) { - arguments->threadNum = atoi(arg); - } else { - fprintf(stderr, "Invalid number of threads\n"); - return -1; - } - break; - case 'k': - arguments->check = atoi(arg); - break; - case 'd': - arguments->database = arg; - break; - case 'n': - arguments->netTestRole = arg; - break; - case 'l': - if (arg) { - arguments->pktLen = atoi(arg); - } else { - fprintf(stderr, "Invalid packet length\n"); - return -1; - } - break; - case 'N': - if (arg) { - arguments->pktNum = atoi(arg); - } else { - fprintf(stderr, "Invalid packet number\n"); - return -1; - } - break; - case 'S': - arguments->pktType = arg; - break; - case OPT_ABORT: - arguments->abort = 1; - break; - default: - return ARGP_ERR_UNKNOWN; - } - return 0; -} - -/* Our argp parser. */ -static struct argp argp = {options, parse_opt, args_doc, doc}; - -char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; -char g_password[SHELL_MAX_PASSWORD_LEN]; - -static void parse_args( - int argc, char *argv[], SShellArguments *arguments) { - for (int i = 1; i < argc; i++) { - if ((strncmp(argv[i], "-p", 2) == 0) - || (strncmp(argv[i], "--password", 10) == 0)) { - printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info()); - if ((strlen(argv[i]) == 2) - || (strncmp(argv[i], "--password", 10) == 0)) { - printf("Enter password: "); - taosSetConsoleEcho(false); - if (scanf("%20s", g_password) > 1) { - fprintf(stderr, "password reading error\n"); - } - taosSetConsoleEcho(true); - if (EOF == getchar()) { - fprintf(stderr, "getchar() return EOF\n"); - } - } else { - tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN); - strcpy(argv[i], "-p"); - } - arguments->password = g_password; - arguments->is_use_passwd = true; - } - } -} - -void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { - static char verType[32] = {0}; - sprintf(verType, "version: %s\n", version); - - argp_program_version = verType; - - if (argc > 1) { - parse_args(argc, argv, arguments); - } - - argp_parse(&argp, argc, argv, 0, 0, arguments); - if (arguments->abort) { - #ifndef _ALPINE - #if 0 - error(10, 0, "ABORTED"); - #endif - #else - abort(); - #endif - } -} - -int32_t shellReadCommand(TAOS *con, char *command) { - unsigned hist_counter = history.hend; - char utf8_array[10] = "\0"; - Command cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.buffer = (char *)calloc(1, MAX_COMMAND_SIZE); - cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE); - showOnScreen(&cmd); - - // Read input. - char c; - while (1) { - c = (char)getchar(); // getchar() return an 'int' value - - if (c == EOF) { - return c; - } - - if (c < 0) { // For UTF-8 - int count = countPrefixOnes(c); - utf8_array[0] = c; - for (int k = 1; k < count; k++) { - c = (char)getchar(); - utf8_array[k] = c; - } - insertChar(&cmd, utf8_array, count); - } else if (c < '\033') { - // Ctrl keys. TODO: Implement ctrl combinations - switch (c) { - case 1: // ctrl A - positionCursorHome(&cmd); - break; - case 3: - printf("\n"); - resetCommand(&cmd, ""); - kill(0, SIGINT); - break; - case 4: // EOF or Ctrl+D - printf("\n"); - taos_close(con); - // write the history - write_history(); - exitShell(); - break; - case 5: // ctrl E - positionCursorEnd(&cmd); - break; - case 8: - backspaceChar(&cmd); - break; - case '\n': - case '\r': - printf("\n"); - if (isReadyGo(&cmd)) { - sprintf(command, "%s%s", cmd.buffer, cmd.command); - tfree(cmd.buffer); - tfree(cmd.command); - return 0; - } else { - updateBuffer(&cmd); - } - break; - case 11: // Ctrl + K; - clearLineAfter(&cmd); - break; - case 12: // Ctrl + L; - system("clear"); - showOnScreen(&cmd); - break; - case 21: // Ctrl + U; - clearLineBefore(&cmd); - break; - } - } else if (c == '\033') { - c = (char)getchar(); - switch (c) { - case '[': - c = (char)getchar(); - switch (c) { - case 'A': // Up arrow - if (hist_counter != history.hstart) { - hist_counter = (hist_counter + MAX_HISTORY_SIZE - 1) % MAX_HISTORY_SIZE; - resetCommand(&cmd, (history.hist[hist_counter] == NULL) ? "" : history.hist[hist_counter]); - } - break; - case 'B': // Down arrow - if (hist_counter != history.hend) { - int next_hist = (hist_counter + 1) % MAX_HISTORY_SIZE; - - if (next_hist != history.hend) { - resetCommand(&cmd, (history.hist[next_hist] == NULL) ? "" : history.hist[next_hist]); - } else { - resetCommand(&cmd, ""); - } - hist_counter = next_hist; - } - break; - case 'C': // Right arrow - moveCursorRight(&cmd); - break; - case 'D': // Left arrow - moveCursorLeft(&cmd); - break; - case '1': - if ((c = (char)getchar()) == '~') { - // Home key - positionCursorHome(&cmd); - } - break; - case '2': - if ((c = (char)getchar()) == '~') { - // Insert key - } - break; - case '3': - if ((c = (char)getchar()) == '~') { - // Delete key - deleteChar(&cmd); - } - break; - case '4': - if ((c = (char)getchar()) == '~') { - // End key - positionCursorEnd(&cmd); - } - break; - case '5': - if ((c = (char)getchar()) == '~') { - // Page up key - } - break; - case '6': - if ((c = (char)getchar()) == '~') { - // Page down key - } - break; - case 72: - // Home key - positionCursorHome(&cmd); - break; - case 70: - // End key - positionCursorEnd(&cmd); - break; - } - break; - } - } else if (c == 0x7f) { - // press delete key - backspaceChar(&cmd); - } else { - insertChar(&cmd, &c, 1); - } - } - - return 0; -} - -void *shellLoopQuery(void *arg) { - if (indicator) { - getOldTerminalMode(); - indicator = 0; - } - - TAOS *con = (TAOS *)arg; - - setThreadName("shellLoopQuery"); - - taosThreadCleanupPush(cleanup_handler, NULL); - - char *command = malloc(MAX_COMMAND_SIZE); - if (command == NULL){ - uError("failed to malloc command"); - return NULL; - } - - int32_t err = 0; - - do { - // Read command from shell. - memset(command, 0, MAX_COMMAND_SIZE); - setTerminalMode(); - err = shellReadCommand(con, command); - if (err) { - break; - } - resetTerminalMode(); - } while (shellRunCommand(con, command) == 0); - - tfree(command); - exitShell(); - - taosThreadCleanupPop(1); - - return NULL; -} - -void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "%s/%s", getenv("HOME"), HISTORY_FILE); } - -void clearScreen(int ecmd_pos, int cursor_pos) { - struct winsize w; - if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - //fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n"); - w.ws_col = 120; - w.ws_row = 30; - } - - int cursor_x = cursor_pos / w.ws_col; - int cursor_y = cursor_pos % w.ws_col; - int command_x = ecmd_pos / w.ws_col; - positionCursor(cursor_y, LEFT); - positionCursor(command_x - cursor_x, DOWN); - fprintf(stdout, "\033[2K"); - for (int i = 0; i < command_x; i++) { - positionCursor(1, UP); - fprintf(stdout, "\033[2K"); - } - fflush(stdout); -} - -void showOnScreen(Command *cmd) { - struct winsize w; - if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - //fprintf(stderr, "No stream device\n"); - w.ws_col = 120; - w.ws_row = 30; - } - - TdWchar wc; - int size = 0; - - // Print out the command. - char *total_string = malloc(MAX_COMMAND_SIZE); - memset(total_string, '\0', MAX_COMMAND_SIZE); - if (strcmp(cmd->buffer, "") == 0) { - sprintf(total_string, "%s%s", PROMPT_HEADER, cmd->command); - } else { - sprintf(total_string, "%s%s", CONTINUE_PROMPT, cmd->command); - } - - int remain_column = w.ws_col; - /* size = cmd->commandSize + prompt_size; */ - for (char *str = total_string; size < cmd->commandSize + prompt_size;) { - int ret = taosMbToWchar(&wc, str, MB_CUR_MAX); - if (ret < 0) break; - size += ret; - /* assert(size >= 0); */ - int width = taosWcharWidth(wc); - if (remain_column > width) { - printf("%lc", wc); - remain_column -= width; - } else { - if (remain_column == width) { - printf("%lc\n\r", wc); - remain_column = w.ws_col; - } else { - printf("\n\r%lc", wc); - remain_column = w.ws_col - width; - } - } - - str = total_string + size; - } - - free(total_string); - /* for (int i = 0; i < size; i++){ */ - /* char c = total_string[i]; */ - /* if (k % w.ws_col == 0) { */ - /* printf("%c\n\r", c); */ - /* } */ - /* else { */ - /* printf("%c", c); */ - /* } */ - /* k += 1; */ - /* } */ - - // Position the cursor - int cursor_pos = cmd->screenOffset + prompt_size; - int ecmd_pos = cmd->endOffset + prompt_size; - - int cursor_x = cursor_pos / w.ws_col; - int cursor_y = cursor_pos % w.ws_col; - // int cursor_y = cursor % w.ws_col; - int command_x = ecmd_pos / w.ws_col; - int command_y = ecmd_pos % w.ws_col; - // int command_y = (command.size() + prompt_size) % w.ws_col; - positionCursor(command_y, LEFT); - positionCursor(command_x, UP); - positionCursor(cursor_x, DOWN); - positionCursor(cursor_y, RIGHT); - fflush(stdout); -} - -void cleanup_handler(void *arg) { resetTerminalMode(); } - -void exitShell() { - /*int32_t ret =*/ resetTerminalMode(); - taos_cleanup(); - exit(EXIT_SUCCESS); -} -void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { - tsem_post(&cancelSem); -} - -void *cancelHandler(void *arg) { - setThreadName("cancelHandler"); - - while (1) { - if (tsem_wait(&cancelSem) != 0) { - taosMsleep(10); - continue; - } - -#ifdef LINUX -#if 0 - int64_t rid = atomic_val_compare_exchange_64(&result, result, 0); - SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); - taos_stop_query(pSql); - taosReleaseRef(tscObjRef, rid); -#endif -#else - resetTerminalMode(); - printf("\nReceive ctrl+c or other signal, quit shell.\n"); - exit(0); -#endif - resetTerminalMode(); - printf("\nReceive ctrl+c or other signal, quit shell.\n"); - exit(0); - } - - return NULL; -} - -int checkVersion() { - if (sizeof(int8_t) != 1) { - printf("taos int8 size is %d(!= 1)", (int)sizeof(int8_t)); - return 0; - } - if (sizeof(int16_t) != 2) { - printf("taos int16 size is %d(!= 2)", (int)sizeof(int16_t)); - return 0; - } - if (sizeof(int32_t) != 4) { - printf("taos int32 size is %d(!= 4)", (int)sizeof(int32_t)); - return 0; - } - if (sizeof(int64_t) != 8) { - printf("taos int64 size is %d(!= 8)", (int)sizeof(int64_t)); - return 0; - } - return 1; -} - -// Global configurations -SShellArguments args = {.host = NULL, -#ifndef TD_WINDOWS - .password = NULL, -#endif - .user = NULL, - .database = NULL, - .timezone = NULL, - .is_raw_time = false, - .is_use_passwd = false, - .dump_config = false, - .file = "\0", - .dir = "\0", - .threadNum = 5, - .commands = NULL, - .pktLen = 1000, - .pktNum = 100, - .pktType = "TCP", - .netTestRole = NULL}; - -/* - * Main function. - */ -int main(int argc, char *argv[]) { - /*setlocale(LC_ALL, "en_US.UTF-8"); */ - - if (!checkVersion()) { - exit(EXIT_FAILURE); - } - - shellParseArgument(argc, argv, &args); - -#if 0 - if (args.dump_config) { - taosInitGlobalCfg(); - taosReadGlobalLogCfg(); - - if (taosReadGlobalCfg() ! =0) { - printf("TDengine read global config failed"); - exit(EXIT_FAILURE); - } - - taosDumpGlobalCfg(); - exit(0); - } - - if (args.netTestRole && args.netTestRole[0] != 0) { - if (taos_init()) { - printf("Failed to init taos"); - exit(EXIT_FAILURE); - } - taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType); - exit(0); - } -#endif - - /* Initialize the shell */ - TAOS *con = shellInit(&args); - if (con == NULL) { - exit(EXIT_FAILURE); - } - - if (tsem_init(&cancelSem, 0, 0) != 0) { - printf("failed to create cancel semphore\n"); - exit(EXIT_FAILURE); - } - - TdThread spid; - taosThreadCreate(&spid, NULL, cancelHandler, NULL); - - /* Interrupt handler. */ - taosSetSignal(SIGTERM, shellQueryInterruptHandler); - taosSetSignal(SIGINT, shellQueryInterruptHandler); - taosSetSignal(SIGHUP, shellQueryInterruptHandler); - taosSetSignal(SIGABRT, shellQueryInterruptHandler); - - /* Get grant information */ - shellGetGrantInfo(con); - - /* Loop to query the input. */ - while (1) { - taosThreadCreate(&pid, NULL, shellLoopQuery, con); - taosThreadJoin(pid, NULL); - } -} From 044db54e741f516e8cd62efcb6402f016a650349 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 23 Mar 2022 06:25:24 +0800 Subject: [PATCH 14/17] refactor --- source/dnode/mgmt/vnode/src/vmInt.c | 2 +- source/dnode/vnode/src/inc/tsdbLog.h | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index a324c60618..746fcd4855 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -343,7 +343,7 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.requiredFp = vmRequire; vmInitMsgHandles(pWrapper); - pWrapper->name = "vnodes"; + pWrapper->name = "vnode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/vnode/src/inc/tsdbLog.h b/source/dnode/vnode/src/inc/tsdbLog.h index 56dc8ab2a0..3afe628198 100644 --- a/source/dnode/vnode/src/inc/tsdbLog.h +++ b/source/dnode/vnode/src/inc/tsdbLog.h @@ -24,12 +24,12 @@ extern "C" { extern int32_t tsdbDebugFlag; -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } From d4cc6c1fe7e888bfcb55fc50446be4921c68be5c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 23 Mar 2022 07:58:07 +0800 Subject: [PATCH 15/17] add bSma/rSma param --- include/common/tmsg.h | 36 +++++++--- source/common/src/tmsg.c | 84 ++++++++++++++++++++++- source/dnode/vnode/src/meta/metaBDBImpl.c | 2 +- source/dnode/vnode/test/tsdbSmaTest.cpp | 55 +++++++++++++++ 4 files changed, 166 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 560490569a..583496a4c6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1363,28 +1363,48 @@ typedef struct { int64_t tuid; } SDDropTopicReq; +typedef struct { + float xFilesFactor; + int8_t delayUnit; + int8_t nFuncIds; + int32_t* pFuncIds; + int64_t delay; +} SRSmaParam; + typedef struct SVCreateTbReq { int64_t ver; // use a general definition char* dbFName; char* name; uint32_t ttl; uint32_t keep; - uint8_t type; + union { + uint8_t info; + struct { + uint8_t rollup : 1; // 1 means rollup sma + uint8_t type : 7; + }; + }; union { struct { - tb_uid_t suid; - uint32_t nCols; - SSchema* pSchema; - uint32_t nTagCols; - SSchema* pTagSchema; + tb_uid_t suid; + uint32_t nCols; + SSchema* pSchema; + uint32_t nTagCols; + SSchema* pTagSchema; + col_id_t nBSmaCols; + col_id_t* pBSmaCols; + SRSmaParam* pRSmaParam; } stbCfg; struct { tb_uid_t suid; SKVRow pTag; } ctbCfg; struct { - uint32_t nCols; - SSchema* pSchema; + uint32_t nCols; + SSchema* pSchema; + col_id_t nBSmaCols; + col_id_t* pBSmaCols; + SRSmaParam* pRSmaParam; } ntbCfg; }; } SVCreateTbReq, SVUpdateTbReq; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 47872b89d5..1ea00566f4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -290,7 +290,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->keep); - tlen += taosEncodeFixedU8(buf, pReq->type); + tlen += taosEncodeFixedU8(buf, pReq->info); switch (pReq->type) { case TD_SUPER_TABLE: @@ -309,6 +309,20 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); } + tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols); + for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { + tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); + } + if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + SRSmaParam *param = pReq->stbCfg.pRSmaParam; + tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); + tlen += taosEncodeFixedI8(buf, param->delayUnit); + tlen += taosEncodeFixedI8(buf, param->nFuncIds); + for(int8_t i=0; i< param->nFuncIds; ++i) { + tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); + } + tlen += taosEncodeFixedI64(buf, param->delay); + } break; case TD_CHILD_TABLE: tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid); @@ -322,6 +336,20 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); } + tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols); + for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { + tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); + } + if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + SRSmaParam *param = pReq->stbCfg.pRSmaParam; + tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); + tlen += taosEncodeFixedI8(buf, param->delayUnit); + tlen += taosEncodeFixedI8(buf, param->nFuncIds); + for(int8_t i=0; i< param->nFuncIds; ++i) { + tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); + } + tlen += taosEncodeFixedI64(buf, param->delay); + } break; default: ASSERT(0); @@ -335,7 +363,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->keep)); - buf = taosDecodeFixedU8(buf, &(pReq->type)); + buf = taosDecodeFixedU8(buf, &(pReq->info)); switch (pReq->type) { case TD_SUPER_TABLE: @@ -356,6 +384,32 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); } + buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); + if(pReq->stbCfg.nBSmaCols > 0) { + pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); + for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { + buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); + } + } else { + pReq->stbCfg.pBSmaCols = NULL; + } + if(pReq->rollup) { + pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); + SRSmaParam *param = pReq->stbCfg.pRSmaParam; + buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedI8(buf, ¶m->delayUnit); + buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); + if(param->nFuncIds > 0) { + for (int8_t i = 0; i< param->nFuncIds; ++i) { + buf = taosDecodeFixedI32(buf, param->pFuncIds + i); + } + } else { + param->pFuncIds = NULL; + } + buf = taosDecodeFixedI64(buf, ¶m->delay); + } else { + pReq->stbCfg.pRSmaParam = NULL; + } break; case TD_CHILD_TABLE: buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid); @@ -370,6 +424,32 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); } + buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); + if(pReq->stbCfg.nBSmaCols > 0) { + pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); + for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { + buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); + } + } else { + pReq->stbCfg.pBSmaCols = NULL; + } + if(pReq->rollup) { + pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); + SRSmaParam *param = pReq->stbCfg.pRSmaParam; + buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedI8(buf, ¶m->delayUnit); + buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); + if(param->nFuncIds > 0) { + for (int8_t i = 0; i< param->nFuncIds; ++i) { + buf = taosDecodeFixedI32(buf, param->pFuncIds + i); + } + } else { + param->pFuncIds = NULL; + } + buf = taosDecodeFixedI64(buf, ¶m->delay); + } else { + pReq->stbCfg.pRSmaParam = NULL; + } break; default: ASSERT(0); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 3eb9a480ac..0a93ea67b3 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -527,7 +527,7 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { buf = taosDecodeString(buf, &(pTbCfg->name)); buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl)); buf = taosDecodeFixedU32(buf, &(pTbCfg->keep)); - buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); + buf = taosDecodeFixedU8(buf, &(pTbCfg->info)); if (pTbCfg->type == META_SUPER_TABLE) { SSchemaWrapper sw; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index b9bf432a72..3c51ad5d71 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -33,6 +33,61 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } +TEST(testCase, unionEncodeDecodeTest) { + typedef struct { + union { + uint8_t info; + struct { + uint8_t rollup : 1; // 1 means rollup sma + uint8_t type : 7; + }; + }; + col_id_t nBSmaCols; + col_id_t* pBSmaCols; + } SUnionTest; + + SUnionTest sut = {0}; + sut.rollup = 1; + sut.type = 1; + + sut.nBSmaCols = 2; + sut.pBSmaCols = (col_id_t*)malloc(sut.nBSmaCols * sizeof(col_id_t)); + for (col_id_t i = 0; i < sut.nBSmaCols; ++i) { + sut.pBSmaCols[i] = i + 100; + } + + void* buf = malloc(1024); + void * pBuf = buf; + int32_t tlen = 0; + tlen += taosEncodeFixedU8(&buf, sut.info); + tlen += taosEncodeFixedI16(&buf, sut.nBSmaCols); + for (col_id_t i = 0; i < sut.nBSmaCols; ++i) { + tlen += taosEncodeFixedI16(&buf, sut.pBSmaCols[i]); + } + + SUnionTest dut = {0}; + pBuf = taosDecodeFixedU8(pBuf, &dut.info); + pBuf = taosDecodeFixedI16(pBuf, &dut.nBSmaCols); + if(dut.nBSmaCols > 0) { + dut.pBSmaCols = (col_id_t*)malloc(dut.nBSmaCols * sizeof(col_id_t)); + for(col_id_t i=0; i < dut.nBSmaCols; ++i) { + pBuf = taosDecodeFixedI16(pBuf, dut.pBSmaCols + i); + } + } else { + dut.pBSmaCols = NULL; + } + + printf("sut.rollup=%" PRIu8 ", type=%" PRIu8 ", info=%" PRIu8 "\n", sut.rollup, sut.type, sut.info); + printf("dut.rollup=%" PRIu8 ", type=%" PRIu8 ", info=%" PRIu8 "\n", dut.rollup, dut.type, dut.info); + + ASSERT_EQ(sut.rollup, dut.rollup); + ASSERT_EQ(sut.type, dut.type); + ASSERT_EQ(sut.nBSmaCols, dut.nBSmaCols); + for (col_id_t i = 0; i< sut.nBSmaCols; ++i) { + ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), sut.pBSmaCols[i]); + ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), dut.pBSmaCols[i]); + } +} #if 1 TEST(testCase, tSma_Meta_Encode_Decode_Test) { // encode From ea034335bba3db3432438ee38336ce9ce82b703b Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 23 Mar 2022 09:36:22 +0800 Subject: [PATCH 16/17] encode info --- source/dnode/vnode/src/meta/metaBDBImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 0a93ea67b3..2dd8386d7a 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -507,7 +507,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { tsize += taosEncodeString(buf, pTbCfg->name); tsize += taosEncodeFixedU32(buf, pTbCfg->ttl); tsize += taosEncodeFixedU32(buf, pTbCfg->keep); - tsize += taosEncodeFixedU8(buf, pTbCfg->type); + tsize += taosEncodeFixedU8(buf, pTbCfg->info); if (pTbCfg->type == META_SUPER_TABLE) { SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema}; From b4bd6b6bb89c231fac014eb77a31f5926aba8312 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Mar 2022 10:10:40 +0800 Subject: [PATCH 17/17] [td-13039] fix compiler error. --- tools/shell/src/shellMain.c | 671 ++++++++++++++++++++++++++++++++++++ 1 file changed, 671 insertions(+) create mode 100644 tools/shell/src/shellMain.c diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c new file mode 100644 index 0000000000..78d6f74df1 --- /dev/null +++ b/tools/shell/src/shellMain.c @@ -0,0 +1,671 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define __USE_XOPEN +#include "os.h" +#include "shell.h" +#include "tglobal.h" +#include "shellCommand.h" +#include "tbase64.h" +#include "tlog.h" +#include "version.h" + +#include +#include +#include + +#define OPT_ABORT 1 /* abort */ + + +int indicator = 1; + +void insertChar(Command *cmd, char *c, int size); +const char *argp_program_version = version; +const char *argp_program_bug_address = ""; +static char doc[] = ""; +static char args_doc[] = ""; + +TdThread pid; +static tsem_t cancelSem; + +static struct argp_option options[] = { + {"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."}, + {"password", 'p', 0, 0, "The password to use when connecting to the server."}, + {"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."}, + {"user", 'u', "USER", 0, "The user name to use when connecting to the server."}, + {"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."}, + {"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."}, + {"dump-config", 'C', 0, 0, "Dump configuration."}, + {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, + {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, + {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, + {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, + {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, + {"check", 'k', "CHECK", 0, "Check tables."}, + {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, + {"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."}, + {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."}, + {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, + {"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."}, + {"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."}, + {0}}; + +static error_t parse_opt(int key, char *arg, struct argp_state *state) { + /* Get the input argument from argp_parse, which we + know is a pointer to our arguments structure. */ + SShellArguments *arguments = state->input; + wordexp_t full_path; + + switch (key) { + case 'h': + arguments->host = arg; + break; + case 'p': + break; + case 'P': + if (arg) { + arguments->port = atoi(arg); + } else { + fprintf(stderr, "Invalid port\n"); + return -1; + } + + break; + case 'z': + arguments->timezone = arg; + break; + case 'u': + arguments->user = arg; + break; + case 'A': + arguments->auth = arg; + break; + case 'c': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + if (strlen(full_path.we_wordv[0]) >= TSDB_FILENAME_LEN) { + fprintf(stderr, "config file path: %s overflow max len %d\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1); + wordfree(&full_path); + return -1; + } + tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN); + wordfree(&full_path); + break; + case 'C': + arguments->dump_config = true; + break; + case 's': + arguments->commands = arg; + break; + case 'r': + arguments->is_raw_time = true; + break; + case 'f': + if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + tstrncpy(arguments->file, full_path.we_wordv[0], TSDB_FILENAME_LEN); + wordfree(&full_path); + break; + case 'D': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + tstrncpy(arguments->dir, full_path.we_wordv[0], TSDB_FILENAME_LEN); + wordfree(&full_path); + break; + case 'T': + if (arg) { + arguments->threadNum = atoi(arg); + } else { + fprintf(stderr, "Invalid number of threads\n"); + return -1; + } + break; + case 'k': + arguments->check = atoi(arg); + break; + case 'd': + arguments->database = arg; + break; + case 'n': + arguments->netTestRole = arg; + break; + case 'l': + if (arg) { + arguments->pktLen = atoi(arg); + } else { + fprintf(stderr, "Invalid packet length\n"); + return -1; + } + break; + case 'N': + if (arg) { + arguments->pktNum = atoi(arg); + } else { + fprintf(stderr, "Invalid packet number\n"); + return -1; + } + break; + case 'S': + arguments->pktType = arg; + break; + case OPT_ABORT: + arguments->abort = 1; + break; + default: + return ARGP_ERR_UNKNOWN; + } + return 0; +} + +/* Our argp parser. */ +static struct argp argp = {options, parse_opt, args_doc, doc}; + +char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n" + "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; +char g_password[SHELL_MAX_PASSWORD_LEN]; + +static void parse_args( + int argc, char *argv[], SShellArguments *arguments) { + for (int i = 1; i < argc; i++) { + if ((strncmp(argv[i], "-p", 2) == 0) + || (strncmp(argv[i], "--password", 10) == 0)) { + printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info()); + if ((strlen(argv[i]) == 2) + || (strncmp(argv[i], "--password", 10) == 0)) { + printf("Enter password: "); + taosSetConsoleEcho(false); + if (scanf("%20s", g_password) > 1) { + fprintf(stderr, "password reading error\n"); + } + taosSetConsoleEcho(true); + if (EOF == getchar()) { + fprintf(stderr, "getchar() return EOF\n"); + } + } else { + tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN); + strcpy(argv[i], "-p"); + } + arguments->password = g_password; + arguments->is_use_passwd = true; + } + } +} + +void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { + static char verType[32] = {0}; + sprintf(verType, "version: %s\n", version); + + argp_program_version = verType; + + if (argc > 1) { + parse_args(argc, argv, arguments); + } + + argp_parse(&argp, argc, argv, 0, 0, arguments); + if (arguments->abort) { + #ifndef _ALPINE + #if 0 + error(10, 0, "ABORTED"); + #endif + #else + abort(); + #endif + } +} + +int32_t shellReadCommand(TAOS *con, char *command) { + unsigned hist_counter = history.hend; + char utf8_array[10] = "\0"; + Command cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.buffer = (char *)calloc(1, MAX_COMMAND_SIZE); + cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE); + showOnScreen(&cmd); + + // Read input. + char c; + while (1) { + c = (char)getchar(); // getchar() return an 'int' value + + if (c == EOF) { + return c; + } + + if (c < 0) { // For UTF-8 + int count = countPrefixOnes(c); + utf8_array[0] = c; + for (int k = 1; k < count; k++) { + c = (char)getchar(); + utf8_array[k] = c; + } + insertChar(&cmd, utf8_array, count); + } else if (c < '\033') { + // Ctrl keys. TODO: Implement ctrl combinations + switch (c) { + case 1: // ctrl A + positionCursorHome(&cmd); + break; + case 3: + printf("\n"); + resetCommand(&cmd, ""); + kill(0, SIGINT); + break; + case 4: // EOF or Ctrl+D + printf("\n"); + taos_close(con); + // write the history + write_history(); + exitShell(); + break; + case 5: // ctrl E + positionCursorEnd(&cmd); + break; + case 8: + backspaceChar(&cmd); + break; + case '\n': + case '\r': + printf("\n"); + if (isReadyGo(&cmd)) { + sprintf(command, "%s%s", cmd.buffer, cmd.command); + tfree(cmd.buffer); + tfree(cmd.command); + return 0; + } else { + updateBuffer(&cmd); + } + break; + case 11: // Ctrl + K; + clearLineAfter(&cmd); + break; + case 12: // Ctrl + L; + system("clear"); + showOnScreen(&cmd); + break; + case 21: // Ctrl + U; + clearLineBefore(&cmd); + break; + } + } else if (c == '\033') { + c = (char)getchar(); + switch (c) { + case '[': + c = (char)getchar(); + switch (c) { + case 'A': // Up arrow + if (hist_counter != history.hstart) { + hist_counter = (hist_counter + MAX_HISTORY_SIZE - 1) % MAX_HISTORY_SIZE; + resetCommand(&cmd, (history.hist[hist_counter] == NULL) ? "" : history.hist[hist_counter]); + } + break; + case 'B': // Down arrow + if (hist_counter != history.hend) { + int next_hist = (hist_counter + 1) % MAX_HISTORY_SIZE; + + if (next_hist != history.hend) { + resetCommand(&cmd, (history.hist[next_hist] == NULL) ? "" : history.hist[next_hist]); + } else { + resetCommand(&cmd, ""); + } + hist_counter = next_hist; + } + break; + case 'C': // Right arrow + moveCursorRight(&cmd); + break; + case 'D': // Left arrow + moveCursorLeft(&cmd); + break; + case '1': + if ((c = (char)getchar()) == '~') { + // Home key + positionCursorHome(&cmd); + } + break; + case '2': + if ((c = (char)getchar()) == '~') { + // Insert key + } + break; + case '3': + if ((c = (char)getchar()) == '~') { + // Delete key + deleteChar(&cmd); + } + break; + case '4': + if ((c = (char)getchar()) == '~') { + // End key + positionCursorEnd(&cmd); + } + break; + case '5': + if ((c = (char)getchar()) == '~') { + // Page up key + } + break; + case '6': + if ((c = (char)getchar()) == '~') { + // Page down key + } + break; + case 72: + // Home key + positionCursorHome(&cmd); + break; + case 70: + // End key + positionCursorEnd(&cmd); + break; + } + break; + } + } else if (c == 0x7f) { + // press delete key + backspaceChar(&cmd); + } else { + insertChar(&cmd, &c, 1); + } + } + + return 0; +} + +void *shellLoopQuery(void *arg) { + if (indicator) { + getOldTerminalMode(); + indicator = 0; + } + + TAOS *con = (TAOS *)arg; + + setThreadName("shellLoopQuery"); + + taosThreadCleanupPush(cleanup_handler, NULL); + + char *command = malloc(MAX_COMMAND_SIZE); + if (command == NULL){ + uError("failed to malloc command"); + return NULL; + } + + int32_t err = 0; + + do { + // Read command from shell. + memset(command, 0, MAX_COMMAND_SIZE); + setTerminalMode(); + err = shellReadCommand(con, command); + if (err) { + break; + } + resetTerminalMode(); + } while (shellRunCommand(con, command) == 0); + + tfree(command); + exitShell(); + + taosThreadCleanupPop(1); + + return NULL; +} + +void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "%s/%s", getenv("HOME"), HISTORY_FILE); } + +void clearScreen(int ecmd_pos, int cursor_pos) { + struct winsize w; + if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { + //fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n"); + w.ws_col = 120; + w.ws_row = 30; + } + + int cursor_x = cursor_pos / w.ws_col; + int cursor_y = cursor_pos % w.ws_col; + int command_x = ecmd_pos / w.ws_col; + positionCursor(cursor_y, LEFT); + positionCursor(command_x - cursor_x, DOWN); + fprintf(stdout, "\033[2K"); + for (int i = 0; i < command_x; i++) { + positionCursor(1, UP); + fprintf(stdout, "\033[2K"); + } + fflush(stdout); +} + +void showOnScreen(Command *cmd) { + struct winsize w; + if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { + //fprintf(stderr, "No stream device\n"); + w.ws_col = 120; + w.ws_row = 30; + } + + TdWchar wc; + int size = 0; + + // Print out the command. + char *total_string = malloc(MAX_COMMAND_SIZE); + memset(total_string, '\0', MAX_COMMAND_SIZE); + if (strcmp(cmd->buffer, "") == 0) { + sprintf(total_string, "%s%s", PROMPT_HEADER, cmd->command); + } else { + sprintf(total_string, "%s%s", CONTINUE_PROMPT, cmd->command); + } + + int remain_column = w.ws_col; + /* size = cmd->commandSize + prompt_size; */ + for (char *str = total_string; size < cmd->commandSize + prompt_size;) { + int ret = taosMbToWchar(&wc, str, MB_CUR_MAX); + if (ret < 0) break; + size += ret; + /* assert(size >= 0); */ + int width = taosWcharWidth(wc); + if (remain_column > width) { + printf("%lc", wc); + remain_column -= width; + } else { + if (remain_column == width) { + printf("%lc\n\r", wc); + remain_column = w.ws_col; + } else { + printf("\n\r%lc", wc); + remain_column = w.ws_col - width; + } + } + + str = total_string + size; + } + + free(total_string); + /* for (int i = 0; i < size; i++){ */ + /* char c = total_string[i]; */ + /* if (k % w.ws_col == 0) { */ + /* printf("%c\n\r", c); */ + /* } */ + /* else { */ + /* printf("%c", c); */ + /* } */ + /* k += 1; */ + /* } */ + + // Position the cursor + int cursor_pos = cmd->screenOffset + prompt_size; + int ecmd_pos = cmd->endOffset + prompt_size; + + int cursor_x = cursor_pos / w.ws_col; + int cursor_y = cursor_pos % w.ws_col; + // int cursor_y = cursor % w.ws_col; + int command_x = ecmd_pos / w.ws_col; + int command_y = ecmd_pos % w.ws_col; + // int command_y = (command.size() + prompt_size) % w.ws_col; + positionCursor(command_y, LEFT); + positionCursor(command_x, UP); + positionCursor(cursor_x, DOWN); + positionCursor(cursor_y, RIGHT); + fflush(stdout); +} + +void cleanup_handler(void *arg) { resetTerminalMode(); } + +void exitShell() { + /*int32_t ret =*/ resetTerminalMode(); + taos_cleanup(); + exit(EXIT_SUCCESS); +} +void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { + tsem_post(&cancelSem); +} + +void *cancelHandler(void *arg) { + setThreadName("cancelHandler"); + + while (1) { + if (tsem_wait(&cancelSem) != 0) { + taosMsleep(10); + continue; + } + +#ifdef LINUX +#if 0 + int64_t rid = atomic_val_compare_exchange_64(&result, result, 0); + SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); + taos_stop_query(pSql); + taosReleaseRef(tscObjRef, rid); +#endif +#else + resetTerminalMode(); + printf("\nReceive ctrl+c or other signal, quit shell.\n"); + exit(0); +#endif + resetTerminalMode(); + printf("\nReceive ctrl+c or other signal, quit shell.\n"); + exit(0); + } + + return NULL; +} + +int checkVersion() { + if (sizeof(int8_t) != 1) { + printf("taos int8 size is %d(!= 1)", (int)sizeof(int8_t)); + return 0; + } + if (sizeof(int16_t) != 2) { + printf("taos int16 size is %d(!= 2)", (int)sizeof(int16_t)); + return 0; + } + if (sizeof(int32_t) != 4) { + printf("taos int32 size is %d(!= 4)", (int)sizeof(int32_t)); + return 0; + } + if (sizeof(int64_t) != 8) { + printf("taos int64 size is %d(!= 8)", (int)sizeof(int64_t)); + return 0; + } + return 1; +} + +// Global configurations +SShellArguments args = {.host = NULL, +#ifndef TD_WINDOWS + .password = NULL, +#endif + .user = NULL, + .database = NULL, + .timezone = NULL, + .is_raw_time = false, + .is_use_passwd = false, + .dump_config = false, + .file = "\0", + .dir = "\0", + .threadNum = 5, + .commands = NULL, + .pktLen = 1000, + .pktNum = 100, + .pktType = "TCP", + .netTestRole = NULL}; + +/* + * Main function. + */ +int main(int argc, char *argv[]) { + /*setlocale(LC_ALL, "en_US.UTF-8"); */ + + if (!checkVersion()) { + exit(EXIT_FAILURE); + } + + shellParseArgument(argc, argv, &args); + +#if 0 + if (args.dump_config) { + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + + if (taosReadGlobalCfg() ! =0) { + printf("TDengine read global config failed"); + exit(EXIT_FAILURE); + } + + taosDumpGlobalCfg(); + exit(0); + } + + if (args.netTestRole && args.netTestRole[0] != 0) { + if (taos_init()) { + printf("Failed to init taos"); + exit(EXIT_FAILURE); + } + taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType); + exit(0); + } +#endif + + /* Initialize the shell */ + TAOS *con = shellInit(&args); + if (con == NULL) { + exit(EXIT_FAILURE); + } + + if (tsem_init(&cancelSem, 0, 0) != 0) { + printf("failed to create cancel semphore\n"); + exit(EXIT_FAILURE); + } + + TdThread spid; + taosThreadCreate(&spid, NULL, cancelHandler, NULL); + + /* Interrupt handler. */ + taosSetSignal(SIGTERM, shellQueryInterruptHandler); + taosSetSignal(SIGINT, shellQueryInterruptHandler); + taosSetSignal(SIGHUP, shellQueryInterruptHandler); + taosSetSignal(SIGABRT, shellQueryInterruptHandler); + + /* Get grant information */ + shellGetGrantInfo(con); + + /* Loop to query the input. */ + while (1) { + taosThreadCreate(&pid, NULL, shellLoopQuery, con); + taosThreadJoin(pid, NULL); + } +}