Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs

This commit is contained in:
plum-lihui 2022-10-25 13:54:57 +08:00
commit 40fae0842f
17 changed files with 133 additions and 109 deletions

View File

@ -290,6 +290,7 @@ typedef struct {
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) (IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL)) #define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
#if 0
// TODO remove this function // TODO remove this function
static FORCE_INLINE bool isNull(const void *val, int32_t type) { static FORCE_INLINE bool isNull(const void *val, int32_t type) {
switch (type) { switch (type) {
@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
return false; return false;
}; };
} }
#endif
typedef struct tDataTypeDescriptor { typedef struct tDataTypeDescriptor {
int16_t type; int16_t type;

View File

@ -347,11 +347,6 @@ typedef struct SInsertStmt {
uint8_t precision; uint8_t precision;
} SInsertStmt; } SInsertStmt;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks { typedef struct SVgDataBlocks {
SVgroupInfo vg; SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt {
ENodeType nodeType; ENodeType nodeType;
ENodeType sqlNodeType; ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>. SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position const char* sql; // current sql statement position
} SVnodeModifOpStmt; } SVnodeModifOpStmt;

View File

@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
int32_t numOfVg = taosHashGetSize(pVgHash); int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
int32_t numOfVg = taosHashGetSize(pVgHash); int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);

View File

@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id); uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
goto cleanup; goto cleanup;
} }
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
if (pTscObj) { if (pTscObj) {
info->taos = pTscObj; info->taos = pTscObj;

View File

@ -5158,7 +5158,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
char name[TSDB_COL_NAME_LEN] = {0}; char name[TSDB_COL_NAME_LEN] = {0};
char *tmp = NULL; char *tmp = NULL;
if (tDecodeCStr(pCoder, &tmp) < 0) return -1; if (tDecodeCStr(pCoder, &tmp) < 0) return -1;
strcpy(name, tmp); strncpy(name, tmp, TSDB_COL_NAME_LEN - 1);
taosArrayPush(pReq->ctb.tagName, name); taosArrayPush(pReq->ctb.tagName, name);
} }
} else if (pReq->type == TSDB_NORMAL_TABLE) { } else if (pReq->type == TSDB_NORMAL_TABLE) {

View File

@ -187,7 +187,9 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
if (pTSBuf->autoDelete) { if (pTSBuf->autoDelete) {
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
taosRemoveFile(pTSBuf->path); if (taosRemoveFile(pTSBuf->path) != 0) {
// tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path);
}
} else { } else {
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
} }

View File

@ -209,7 +209,7 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
return numOfSpaces; return numOfSpaces;
} }
void static addTimezoneParam(SNodeList* pList) { static int32_t addTimezoneParam(SNodeList* pList) {
char buf[6] = {0}; char buf[6] = {0};
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
struct tm tmInfo; struct tm tmInfo;
@ -218,6 +218,10 @@ void static addTimezoneParam(SNodeList* pList) {
int32_t len = (int32_t)strlen(buf); int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (pVal == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVal->literal = strndup(buf, len); pVal->literal = strndup(buf, len);
pVal->isDuration = false; pVal->isDuration = false;
pVal->translate = true; pVal->translate = true;
@ -229,10 +233,15 @@ void static addTimezoneParam(SNodeList* pList) {
strncpy(varDataVal(pVal->datum.p), pVal->literal, len); strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal); nodesListAppend(pList, (SNode*)pVal);
return TSDB_CODE_SUCCESS;
} }
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) { static int32_t addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (pVal == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVal->literal = NULL; pVal->literal = NULL;
pVal->isDuration = false; pVal->isDuration = false;
pVal->translate = true; pVal->translate = true;
@ -244,6 +253,7 @@ void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
pVal->typeData = (int64_t)precision; pVal->typeData = (int64_t)precision;
nodesListMakeAppend(pList, (SNode*)pVal); nodesListMakeAppend(pList, (SNode*)pVal);
return TSDB_CODE_SUCCESS;
} }
// There is only one parameter of numeric type, and the return type is parameter type // There is only one parameter of numeric type, and the return type is parameter type
@ -465,7 +475,10 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le
// add database precision as param // add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1487,7 +1500,10 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
// add database precision as param // add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1810,7 +1826,10 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// add database precision as param // add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1844,7 +1863,10 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format"); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format");
} }
} else { // add default client timezone } else { // add default client timezone
addTimezoneParam(pFunc->pParameterList); int32_t code = addTimezoneParam(pFunc->pParameterList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
// set result type // set result type
@ -1863,7 +1885,10 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
// add database precision as param // add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1894,7 +1919,10 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFunc->node.resType = pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
@ -1935,7 +1963,10 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
} }
} }
addDbPrecisonParam(&pFunc->pParameterList, dbPrec); int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1861,7 +1861,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1; numOfElem += 1;
pStddevRes->count += 1; pStddevRes->count += 1;
pStddevRes->usum += plist[i]; pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i]; pStddevRes->quadraticUSum += plist[i] * plist[i];
} }
break; break;
@ -1877,7 +1877,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1; numOfElem += 1;
pStddevRes->count += 1; pStddevRes->count += 1;
pStddevRes->usum += plist[i]; pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i]; pStddevRes->quadraticUSum += plist[i] * plist[i];
} }
break; break;
} }
@ -1892,7 +1892,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1; numOfElem += 1;
pStddevRes->count += 1; pStddevRes->count += 1;
pStddevRes->usum += plist[i]; pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i]; pStddevRes->quadraticUSum += plist[i] * plist[i];
} }
break; break;
@ -1908,7 +1908,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1; numOfElem += 1;
pStddevRes->count += 1; pStddevRes->count += 1;
pStddevRes->usum += plist[i]; pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i]; pStddevRes->quadraticUSum += plist[i] * plist[i];
} }
break; break;
} }
@ -5359,7 +5359,7 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
int32_t resIndex; int32_t resIndex = -1;
int32_t maxCount = 0; int32_t maxCount = 0;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes)); SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
@ -5369,8 +5369,12 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} }
} }
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); if (maxCount != 0) {
colDataAppend(pCol, currentRow, pResItem->data, (maxCount == 0) ? true : false); SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, false);
} else {
colDataAppendNULL(pCol, currentRow);
}
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }

View File

@ -495,7 +495,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
assert(list->size > 0); ASSERT(list != NULL && list->size > 0);
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);

View File

@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset, int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
SArray *pBlockList, SVCreateTbReq *pCreateTbReq); SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks); int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);

View File

@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// merge according to vgId // merge according to vgId
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
} }
return insBuildOutput(pCxt); return insBuildOutput(pCxt);
} }
@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) {
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
// merge according to vgId // merge according to vgId
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
} }
return insBuildOutput(pCxt); return insBuildOutput(pCxt);
} }
@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
} }
} }
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!context.pComCxt->needMultiParse) { if (!context.pComCxt->needMultiParse) {
code = skipInsertInto(&context.pSql, &context.msg); code = skipInsertInto(&context.pSql, &context.msg);

View File

@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
// merge according to vgId // merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE( CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks));
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
} }
CHECK_CODE(insBuildOutput(&insertCtx)); CHECK_CODE(insBuildOutput(&insertCtx));

View File

@ -21,9 +21,6 @@
#include "querynodes.h" #include "querynodes.h"
#include "tRealloc.h" #include "tRealloc.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
typedef struct SBlockKeyTuple { typedef struct SBlockKeyTuple {
TSKEY skey; TSKEY skey;
void* payloadAddr; void* payloadAddr;
@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
static int32_t getRowExpandSize(STableMeta* pTableMeta) { static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY); int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
int32_t columns = getNumOfColumns(pTableMeta); int32_t columns = getNumOfColumns(pTableMeta);
@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
result += (int32_t)TD_BITMAP_BYTES(columns - 1); result += (int32_t)TD_BITMAP_BYTES(columns - 1);
return result; return result;
} }
#endif
void insDestroyBlockArrayList(SArray* pDataBlockList) { void insDestroyBlockArrayList(SArray* pDataBlockList) {
if (pDataBlockList == NULL) { if (pDataBlockList == NULL) {
@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
taosHashCleanup(pDataBlockHash); taosHashCleanup(pDataBlockHash);
} }
#if 0
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
} }
#endif
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
@ -667,68 +667,31 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
} }
// Erase the empty space reserved for binary data // Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
bool isRawPayload) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = getTableInfo(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen; int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
SSubmitBlk* pBlock = pDataBlock; SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen); memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
pDataBlock = (char*)pDataBlock + nonDataLen; pDataBlock = (char*)pDataBlock + nonDataLen;
int32_t flen = 0; // original total length of row
if (isRawPayload) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
}
pBlock->schemaLen = pTableDataBlock->createTbReqLen; pBlock->schemaLen = pTableDataBlock->createTbReqLen;
char* p = pTableDataBlock->pData + nonDataLen;
pBlock->dataLen = 0; pBlock->dataLen = 0;
int32_t numOfRows = pBlock->numOfRows; int32_t numOfRows = pBlock->numOfRows;
for (int32_t i = 0; i < numOfRows; ++i) {
if (isRawPayload) { void* payload = (blkKeyTuple + i)->payloadAddr;
SRowBuilder builder = {0}; TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
memcpy(pDataBlock, payload, rowTLen);
tdSRowInit(&builder, pTableMeta->sversion); pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen); pBlock->dataLen += rowTLen;
for (int32_t i = 0; i < numOfRows; ++i) {
tdSRowResetBuf(&builder, pDataBlock);
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
int8_t colType = pSchema[j].type;
uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
toffset += TYPE_BYTES[colType];
p += pSchema[j].bytes;
}
tdSRowEnd(&builder);
int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
pDataBlock = (char*)pDataBlock + rowLen;
pBlock->dataLen += rowLen;
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
void* payload = (blkKeyTuple + i)->payloadAddr;
TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
} }
return pBlock->dataLen + pBlock->schemaLen; return pBlock->dataLen + pBlock->schemaLen;
} }
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
} }
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0); ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int64_t destSize = dataBuf->size + pOneTableBlock->size +
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
pOneTableBlock->createTbReqLen; pOneTableBlock->createTbReqLen;
@ -774,23 +736,18 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
} }
} }
if (isRawPayload) { if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
sortRemoveDataBlockDupRowsRaw(pOneTableBlock); tdFreeSBlockRowMerger(pBlkRowMerger);
} else { taosHashCleanup(pVnodeDataBlockHashList);
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { insDestroyBlockArrayList(pVnodeDataBlockList);
tdFreeSBlockRowMerger(pBlkRowMerger); taosMemoryFreeClear(dataBuf->pData);
taosHashCleanup(pVnodeDataBlockHashList); taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
insDestroyBlockArrayList(pVnodeDataBlockList); return code;
taosMemoryFreeClear(dataBuf->pData);
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
} }
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);

View File

@ -1082,7 +1082,12 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left,
if (info->unitNum >= info->unitSize) { if (info->unitNum >= info->unitSize) {
uint32_t psize = info->unitSize; uint32_t psize = info->unitSize;
info->unitSize += FILTER_DEFAULT_UNIT_SIZE; info->unitSize += FILTER_DEFAULT_UNIT_SIZE;
info->units = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit));
void *tmp = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
info->units = tmp;
memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE); memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE);
} }
@ -1135,7 +1140,12 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) { int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
if (group->unitNum >= group->unitSize) { if (group->unitNum >= group->unitSize) {
group->unitSize += FILTER_DEFAULT_UNIT_SIZE; group->unitSize += FILTER_DEFAULT_UNIT_SIZE;
group->unitIdxs = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs));
void *tmp = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
group->unitIdxs = tmp;
} }
group->unitIdxs[group->unitNum++] = unitIdx; group->unitIdxs[group->unitNum++] = unitIdx;
@ -3712,7 +3722,7 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
SListCell *cell = node->pParameterList->pHead; SListCell *cell = node->pParameterList->pHead;
for (int32_t i = 0; i < node->pParameterList->length; ++i) { for (int32_t i = 0; i < node->pParameterList->length; ++i) {
if (NULL == cell || NULL == cell->pNode) { if (NULL == cell || NULL == cell->pNode) {
fltError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); fltError("invalid cell");
stat->code = TSDB_CODE_QRY_INVALID_INPUT; stat->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
@ -4066,6 +4076,10 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
*p = output.columnData; *p = output.columnData;
output.numOfRows = pSrc->info.rows; output.numOfRows = pSrc->info.rows;
if (*p == NULL) {
return false;
}
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
// todo this should be return during filter procedure // todo this should be return during filter procedure

View File

@ -331,7 +331,10 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
ASSERT(param->columnData == NULL); ASSERT(param->columnData == NULL);
param->numOfRows = 1; param->numOfRows = 1;
/*int32_t code = */ sclCreateColumnInfoData(&valueNode->node.resType, 1, param); int32_t code = sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
if (code != TSDB_CODE_SUCCESS) {
SCL_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
colDataAppendNULL(param->columnData, 0); colDataAppendNULL(param->columnData, 0);
} else { } else {
@ -1485,8 +1488,13 @@ static int32_t sclGetMinusOperatorResType(SOperatorNode *pOp) {
} }
static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) { static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) || (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) { (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
@ -1507,10 +1515,21 @@ static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
} }
static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
if (pOp == NULL || pOp->pLeft == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
if (pOp->pRight == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
((SExprNode *)(pOp->pRight))->resType = ldt; ((SExprNode *)(pOp->pRight))->resType = ldt;
} else if (nodesIsRegularOp(pOp)) { } else if (nodesIsRegularOp(pOp)) {
if (pOp->pRight == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
@ -1523,8 +1542,13 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
} }
static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) { static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) {
if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType; SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) { if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }

View File

@ -1416,7 +1416,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal[k] = timeVal[k] * 1000; timeVal[k] = timeVal[k] * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal[k] = timeVal[k]; timeVal[k] = timeVal[k] * 1;
} else { } else {
hasNull = true; hasNull = true;
break; break;

View File

@ -344,8 +344,11 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
char *t = taosMemoryCalloc(1, outputMaxLen); char *t = taosMemoryCalloc(1, outputMaxLen);
/*int32_t resLen = */ taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), int32_t ret = taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t),
outputMaxLen - VARSTR_HEADER_SIZE, &len); outputMaxLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
sclError("failed to convert to NCHAR");
}
varDataSetLen(t, len); varDataSetLen(t, len);
colDataAppend(pOut->columnData, rowIndex, t, false); colDataAppend(pOut->columnData, rowIndex, t, false);