diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 9ff315fc2e..bfd6a75c3a 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -290,6 +290,7 @@ typedef struct { (IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) #define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL)) +#if 0 // TODO remove this function static FORCE_INLINE bool isNull(const void *val, int32_t type) { switch (type) { @@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) { return false; }; } +#endif typedef struct tDataTypeDescriptor { int16_t type; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5f2ce66f25..837834f795 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -347,11 +347,6 @@ typedef struct SInsertStmt { uint8_t precision; } SInsertStmt; -typedef enum { - PAYLOAD_TYPE_KV = 0, - PAYLOAD_TYPE_RAW = 1, -} EPayloadType; - typedef struct SVgDataBlocks { SVgroupInfo vg; int32_t numOfTables; // number of tables in current submit block @@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt { ENodeType nodeType; ENodeType sqlNodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. - uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position } SVnodeModifOpStmt; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index dfcd8c4a79..f05d37765d 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) goto end; } SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); - nodeStmt->payloadType = PAYLOAD_TYPE_KV; nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); @@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); - nodeStmt->payloadType = PAYLOAD_TYPE_KV; int32_t numOfVg = taosHashGetSize(pVgHash); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); @@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); - nodeStmt->payloadType = PAYLOAD_TYPE_KV; int32_t numOfVg = taosHashGetSize(pVgHash); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5b8266b1cb..5da7bebf2a 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id); goto cleanup; } - ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; if (pTscObj) { info->taos = pTscObj; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4fd1187fef..47a260c147 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5158,7 +5158,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { char name[TSDB_COL_NAME_LEN] = {0}; char *tmp = NULL; if (tDecodeCStr(pCoder, &tmp) < 0) return -1; - strcpy(name, tmp); + strncpy(name, tmp, TSDB_COL_NAME_LEN - 1); taosArrayPush(pReq->ctb.tagName, name); } } else if (pReq->type == TSDB_NORMAL_TABLE) { diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index aeb9292b41..fe5c599d53 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -187,7 +187,9 @@ void* tsBufDestroy(STSBuf* pTSBuf) { if (pTSBuf->autoDelete) { // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); - taosRemoveFile(pTSBuf->path); + if (taosRemoveFile(pTSBuf->path) != 0) { + // tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path); + } } else { // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 2223954a5b..96a14be4cc 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -209,7 +209,7 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) { return numOfSpaces; } -void static addTimezoneParam(SNodeList* pList) { +static int32_t addTimezoneParam(SNodeList* pList) { char buf[6] = {0}; time_t t = taosTime(NULL); struct tm tmInfo; @@ -218,6 +218,10 @@ void static addTimezoneParam(SNodeList* pList) { int32_t len = (int32_t)strlen(buf); SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (pVal == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pVal->literal = strndup(buf, len); pVal->isDuration = false; pVal->translate = true; @@ -229,10 +233,15 @@ void static addTimezoneParam(SNodeList* pList) { strncpy(varDataVal(pVal->datum.p), pVal->literal, len); nodesListAppend(pList, (SNode*)pVal); + return TSDB_CODE_SUCCESS; } -void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) { +static int32_t addDbPrecisonParam(SNodeList** pList, uint8_t precision) { SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (pVal == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pVal->literal = NULL; pVal->isDuration = false; pVal->translate = true; @@ -244,6 +253,7 @@ void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) { pVal->typeData = (int64_t)precision; nodesListMakeAppend(pList, (SNode*)pVal); + return TSDB_CODE_SUCCESS; } // There is only one parameter of numeric type, and the return type is parameter type @@ -465,7 +475,10 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; return TSDB_CODE_SUCCESS; @@ -1487,7 +1500,10 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; return TSDB_CODE_SUCCESS; @@ -1810,7 +1826,10 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } return TSDB_CODE_SUCCESS; } @@ -1844,7 +1863,10 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format"); } } else { // add default client timezone - addTimezoneParam(pFunc->pParameterList); + int32_t code = addTimezoneParam(pFunc->pParameterList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } // set result type @@ -1863,7 +1885,10 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; @@ -1894,7 +1919,10 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_ "TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; @@ -1935,7 +1963,10 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le } } - addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dd5e4178bd..a093fe5116 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1861,7 +1861,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; pStddevRes->usum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->quadraticUSum += plist[i] * plist[i]; } break; @@ -1877,7 +1877,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; pStddevRes->usum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->quadraticUSum += plist[i] * plist[i]; } break; } @@ -1892,7 +1892,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; pStddevRes->usum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->quadraticUSum += plist[i] * plist[i]; } break; @@ -1908,7 +1908,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; pStddevRes->usum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->quadraticUSum += plist[i] * plist[i]; } break; } @@ -5359,7 +5359,7 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); int32_t currentRow = pBlock->info.rows; - int32_t resIndex; + int32_t resIndex = -1; int32_t maxCount = 0; for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes)); @@ -5369,8 +5369,12 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } } - SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); - colDataAppend(pCol, currentRow, pResItem->data, (maxCount == 0) ? true : false); + if (maxCount != 0) { + SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); + colDataAppend(pCol, currentRow, pResItem->data, false); + } else { + colDataAppendNULL(pCol, currentRow); + } return pResInfo->numOfRes; } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 6a1427c63f..0924106476 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -495,7 +495,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); - assert(list->size > 0); + ASSERT(list != NULL && list->size > 0); for (int32_t f = 0; f < list->size; ++f) { SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 78e87d847c..1e941632e7 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, SArray *pBlockList, SVCreateTbReq *pCreateTbReq); -int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks); +int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks); int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 84c9664fa9..1ccb34cd1f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // merge according to vgId if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks)); } return insBuildOutput(pCxt); } @@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) { parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); // merge according to vgId if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks)); } return insBuildOutput(pCxt); } @@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache } } - context.pOutput->payloadType = PAYLOAD_TYPE_KV; - int32_t code = TSDB_CODE_SUCCESS; if (!context.pComCxt->needMultiParse) { code = skipInsertInto(&context.pSql, &context.msg); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index fda3e0866d..f85ceccf6e 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash // merge according to vgId if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { - CHECK_CODE( - insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks)); + CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks)); } CHECK_CODE(insBuildOutput(&insertCtx)); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index e921cfe997..570a6f9859 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -21,9 +21,6 @@ #include "querynodes.h" #include "tRealloc.h" -#define IS_RAW_PAYLOAD(t) \ - (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert - typedef struct SBlockKeyTuple { TSKEY skey; void* payloadAddr; @@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in return TSDB_CODE_SUCCESS; } - +#if 0 static int32_t getRowExpandSize(STableMeta* pTableMeta) { int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY); int32_t columns = getNumOfColumns(pTableMeta); @@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { result += (int32_t)TD_BITMAP_BYTES(columns - 1); return result; } +#endif void insDestroyBlockArrayList(SArray* pDataBlockList) { if (pDataBlockList == NULL) { @@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) { taosHashCleanup(pDataBlockHash); } +#if 0 // data block is disordered, sort it in ascending order void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; @@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { dataBuf->prevTS = INT64_MIN; } +#endif // data block is disordered, sort it in ascending order static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { @@ -667,68 +667,31 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p } // Erase the empty space reserved for binary data -static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, - bool isRawPayload) { +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) { // TODO: optimize this function, handle the case while binary is not presented - STableMeta* pTableMeta = pTableDataBlock->pTableMeta; - STableComInfo tinfo = getTableInfo(pTableMeta); - SSchema* pSchema = getTableColumnSchema(pTableMeta); - int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen; SSubmitBlk* pBlock = pDataBlock; memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen); pDataBlock = (char*)pDataBlock + nonDataLen; - int32_t flen = 0; // original total length of row - if (isRawPayload) { - for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { - flen += TYPE_BYTES[pSchema[j].type]; - } - } pBlock->schemaLen = pTableDataBlock->createTbReqLen; - - char* p = pTableDataBlock->pData + nonDataLen; pBlock->dataLen = 0; + int32_t numOfRows = pBlock->numOfRows; - - if (isRawPayload) { - SRowBuilder builder = {0}; - - tdSRowInit(&builder, pTableMeta->sversion); - tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen); - - for (int32_t i = 0; i < numOfRows; ++i) { - tdSRowResetBuf(&builder, pDataBlock); - int toffset = 0; - for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { - int8_t colType = pSchema[j].type; - uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM; - tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j); - toffset += TYPE_BYTES[colType]; - p += pSchema[j].bytes; - } - tdSRowEnd(&builder); - int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock); - pDataBlock = (char*)pDataBlock + rowLen; - pBlock->dataLen += rowLen; - } - } else { - for (int32_t i = 0; i < numOfRows; ++i) { - void* payload = (blkKeyTuple + i)->payloadAddr; - TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload); - memcpy(pDataBlock, payload, rowTLen); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } + for (int32_t i = 0; i < numOfRows; ++i) { + void* payload = (blkKeyTuple + i)->payloadAddr; + TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload); + memcpy(pDataBlock, payload, rowTLen); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; } return pBlock->dataLen + pBlock->schemaLen; } -int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { +int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) { const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; - bool isRawPayload = IS_RAW_PAYLOAD(payloadType); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); @@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray* } ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0); // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; - int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + + int64_t destSize = dataBuf->size + pOneTableBlock->size + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + pOneTableBlock->createTbReqLen; @@ -774,23 +736,18 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray* } } - if (isRawPayload) { - sortRemoveDataBlockDupRowsRaw(pOneTableBlock); - } else { - if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { - tdFreeSBlockRowMerger(pBlkRowMerger); - taosHashCleanup(pVnodeDataBlockHashList); - insDestroyBlockArrayList(pVnodeDataBlockList); - taosMemoryFreeClear(dataBuf->pData); - taosMemoryFreeClear(blkKeyInfo.pKeyTuple); - return code; - } - ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); + if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { + tdFreeSBlockRowMerger(pBlkRowMerger); + taosHashCleanup(pVnodeDataBlockHashList); + insDestroyBlockArrayList(pVnodeDataBlockList); + taosMemoryFreeClear(dataBuf->pData); + taosMemoryFreeClear(blkKeyInfo.pKeyTuple); + return code; } + ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); // erase the empty space reserved for binary data - int32_t finalLen = - trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload); + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index a04d5a3a3a..e7f3fe7b49 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1082,7 +1082,12 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, if (info->unitNum >= info->unitSize) { uint32_t psize = info->unitSize; info->unitSize += FILTER_DEFAULT_UNIT_SIZE; - info->units = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit)); + + void *tmp = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + info->units = tmp; memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE); } @@ -1135,7 +1140,12 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) { if (group->unitNum >= group->unitSize) { group->unitSize += FILTER_DEFAULT_UNIT_SIZE; - group->unitIdxs = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs)); + + void *tmp = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + group->unitIdxs = tmp; } group->unitIdxs[group->unitNum++] = unitIdx; @@ -3712,7 +3722,7 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) { SListCell *cell = node->pParameterList->pHead; for (int32_t i = 0; i < node->pParameterList->length; ++i) { if (NULL == cell || NULL == cell->pNode) { - fltError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); + fltError("invalid cell"); stat->code = TSDB_CODE_QRY_INVALID_INPUT; return DEAL_RES_ERROR; } @@ -4066,6 +4076,10 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC *p = output.columnData; output.numOfRows = pSrc->info.rows; + if (*p == NULL) { + return false; + } + bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); // todo this should be return during filter procedure diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index cc1949f17d..7c59c70f30 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -331,7 +331,10 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t ASSERT(param->columnData == NULL); param->numOfRows = 1; - /*int32_t code = */ sclCreateColumnInfoData(&valueNode->node.resType, 1, param); + int32_t code = sclCreateColumnInfoData(&valueNode->node.resType, 1, param); + if (code != TSDB_CODE_SUCCESS) { + SCL_RET(TSDB_CODE_OUT_OF_MEMORY); + } if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { colDataAppendNULL(param->columnData, 0); } else { @@ -1485,8 +1488,13 @@ static int32_t sclGetMinusOperatorResType(SOperatorNode *pOp) { } static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) { + if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; + if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) || (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) { @@ -1507,10 +1515,21 @@ static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) { } static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { + if (pOp == NULL || pOp->pLeft == NULL) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; + if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { + if (pOp->pRight == NULL) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } ((SExprNode *)(pOp->pRight))->resType = ldt; } else if (nodesIsRegularOp(pOp)) { + if (pOp->pRight == NULL) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { @@ -1523,8 +1542,13 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { } static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) { + if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; + if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) { return TSDB_CODE_TSC_INVALID_OPERATION; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 1d8744d726..7e93c9173c 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1416,7 +1416,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { timeVal[k] = timeVal[k] * 1000; } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal[k] = timeVal[k]; + timeVal[k] = timeVal[k] * 1; } else { hasNull = true; break; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 6480c53dbf..415ceeb2bd 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -344,8 +344,11 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; char *t = taosMemoryCalloc(1, outputMaxLen); - /*int32_t resLen = */ taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), - outputMaxLen - VARSTR_HEADER_SIZE, &len); + int32_t ret = taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), + outputMaxLen - VARSTR_HEADER_SIZE, &len); + if (!ret) { + sclError("failed to convert to NCHAR"); + } varDataSetLen(t, len); colDataAppend(pOut->columnData, rowIndex, t, false);