Merge remote-tracking branch 'origin/3.0' into fix/TD-19441
This commit is contained in:
commit
e1ef696a09
|
@ -51,6 +51,9 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
#else
|
#else
|
||||||
|
|
||||||
|
#ifndef __func__
|
||||||
|
#define __func__ __FUNCTION__
|
||||||
|
#endif
|
||||||
#include <malloc.h>
|
#include <malloc.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#ifndef TD_USE_WINSOCK
|
#ifndef TD_USE_WINSOCK
|
||||||
|
|
|
@ -117,9 +117,13 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
||||||
for (int32_t j = 0; j < innerSz; j++) {
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) return -1;
|
if (pTask == NULL) {
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
|
if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(pArray, &pTask);
|
taosArrayPush(pArray, &pTask);
|
||||||
|
|
|
@ -180,6 +180,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tdbFree(pKey);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
|
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
|
||||||
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
||||||
if (pFile != NULL) {
|
if (pFile == NULL) {
|
||||||
taosMemoryFree(fname);
|
taosMemoryFree(fname);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -767,6 +767,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
||||||
SPhysiNode* pPhyNode; // create new child
|
SPhysiNode* pPhyNode; // create new child
|
||||||
bool isFinal;
|
bool isFinal;
|
||||||
bool ignoreExpiredData;
|
bool ignoreExpiredData;
|
||||||
|
SHashObj* pGroupIdTbNameMap;
|
||||||
} SStreamSessionAggOperatorInfo;
|
} SStreamSessionAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SStreamPartitionOperatorInfo {
|
typedef struct SStreamPartitionOperatorInfo {
|
||||||
|
@ -845,6 +846,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
||||||
void* pDelIterator;
|
void* pDelIterator;
|
||||||
SArray* pChildren; // cache for children's result;
|
SArray* pChildren; // cache for children's result;
|
||||||
bool ignoreExpiredData;
|
bool ignoreExpiredData;
|
||||||
|
SHashObj* pGroupIdTbNameMap;
|
||||||
} SStreamStateAggOperatorInfo;
|
} SStreamStateAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
|
@ -899,8 +901,12 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
|
||||||
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
const char* pkey);
|
const char* pkey);
|
||||||
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
||||||
|
|
||||||
|
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf);
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf);
|
SDiskbasedBuf* pBuf);
|
||||||
|
|
||||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
||||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||||
|
|
|
@ -1389,6 +1389,46 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSDataBlock* pBlock = pbInfo->pRes;
|
||||||
|
|
||||||
|
// set output datablock version
|
||||||
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
if (!hasRemainResults(pGroupResInfo)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear the existed group id
|
||||||
|
pBlock->info.groupId = 0;
|
||||||
|
ASSERT(!pbInfo->mergeResultBlock);
|
||||||
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
||||||
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||||
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||||
|
if (tbname != NULL) {
|
||||||
|
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else {
|
||||||
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
}
|
||||||
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||||
|
if (tbname != NULL) {
|
||||||
|
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else {
|
||||||
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf) {
|
SDiskbasedBuf* pBuf) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
|
@ -3728,6 +3728,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pInfo->pPhyNode = pPhyNode;
|
pInfo->pPhyNode = pPhyNode;
|
||||||
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->name = "StreamSessionWindowAggOperator";
|
pOperator->name = "StreamSessionWindowAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
@ -4351,7 +4354,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -4370,6 +4373,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
||||||
|
|
||||||
|
if (pBlock->info.parTbName[0]) {
|
||||||
|
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
|
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
|
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
|
||||||
|
@ -4451,7 +4460,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -4482,7 +4491,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "semi session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
|
@ -4523,6 +4532,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, "semi session recv");
|
printDataBlock(pBlock, "semi session recv");
|
||||||
|
|
||||||
|
if (pBlock->info.parTbName[0]) {
|
||||||
|
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
|
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
|
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
|
||||||
|
@ -4566,7 +4581,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "semi session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
|
@ -4610,6 +4625,10 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
if (numOfChild > 0) {
|
if (numOfChild > 0) {
|
||||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||||
|
@ -4910,7 +4929,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, "single state");
|
printDataBlock(pInfo->pDelRes, "single state");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -4972,7 +4991,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, "single state");
|
printDataBlock(pInfo->pDelRes, "single state");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
printDataBlock(pBInfo->pRes, "single state");
|
printDataBlock(pBInfo->pRes, "single state");
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -5045,6 +5064,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->pChildren = NULL;
|
pInfo->pChildren = NULL;
|
||||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->name = "StreamStateAggOperator";
|
pOperator->name = "StreamStateAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
|
|
@ -709,8 +709,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
|
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
|
||||||
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
|
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
|
||||||
|
|
||||||
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows + 1, 1);
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
char *output = outputBuf;
|
char * convBuf = taosMemoryMalloc(inputLen);
|
||||||
|
char * output = taosMemoryCalloc(1, outputLen + TSDB_NCHAR_SIZE);
|
||||||
|
char buf[400] = {0};
|
||||||
|
|
||||||
|
if (convBuf == NULL || output == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||||
|
@ -723,17 +730,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
switch(outputType) {
|
switch(outputType) {
|
||||||
case TSDB_DATA_TYPE_TINYINT: {
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(int8_t *)output = taosStr2Int8(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(int8_t *)output = taosStr2Int8(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
|
||||||
*(int8_t *)output = taosStr2Int8(newBuf, NULL, 10);
|
convBuf[len] = 0;
|
||||||
taosMemoryFree(newBuf);
|
*(int8_t *)output = taosStr2Int8(convBuf, NULL, 10);
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(int8_t *)output, int8_t, inputType, input);
|
GET_TYPED_DATA(*(int8_t *)output, int8_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -741,17 +749,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_SMALLINT: {
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(int16_t *)output = taosStr2Int16(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(int16_t *)output = taosStr2Int16(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(int16_t *)output = taosStr2Int16(newBuf, NULL, 10);
|
*(int16_t *)output = taosStr2Int16(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(int16_t *)output, int16_t, inputType, input);
|
GET_TYPED_DATA(*(int16_t *)output, int16_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -759,17 +767,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_INT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(int32_t *)output = taosStr2Int32(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(int32_t *)output = taosStr2Int32(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
|
||||||
*(int32_t *)output = taosStr2Int32(newBuf, NULL, 10);
|
convBuf[len] = 0;
|
||||||
taosMemoryFree(newBuf);
|
*(int32_t *)output = taosStr2Int32(convBuf, NULL, 10);
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(int32_t *)output, int32_t, inputType, input);
|
GET_TYPED_DATA(*(int32_t *)output, int32_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -777,17 +786,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_BIGINT: {
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(int64_t *)output = taosStr2Int64(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(int64_t *)output = taosStr2Int64(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(int64_t *)output = taosStr2Int64(newBuf, NULL, 10);
|
*(int64_t *)output = taosStr2Int64(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
|
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -795,17 +804,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_UTINYINT: {
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(uint8_t *)output = taosStr2UInt8(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(uint8_t *)output = taosStr2UInt8(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(uint8_t *)output = taosStr2UInt8(newBuf, NULL, 10);
|
*(uint8_t *)output = taosStr2UInt8(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(uint8_t *)output, uint8_t, inputType, input);
|
GET_TYPED_DATA(*(uint8_t *)output, uint8_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -813,17 +822,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_USMALLINT: {
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(uint16_t *)output = taosStr2UInt16(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(uint16_t *)output = taosStr2UInt16(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(uint16_t *)output = taosStr2UInt16(newBuf, NULL, 10);
|
*(uint16_t *)output = taosStr2UInt16(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(uint16_t *)output, uint16_t, inputType, input);
|
GET_TYPED_DATA(*(uint16_t *)output, uint16_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -831,17 +840,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_UINT: {
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(uint32_t *)output = taosStr2UInt32(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(uint32_t *)output = taosStr2UInt32(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(uint32_t *)output = taosStr2UInt32(newBuf, NULL, 10);
|
*(uint32_t *)output = taosStr2UInt32(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(uint32_t *)output, uint32_t, inputType, input);
|
GET_TYPED_DATA(*(uint32_t *)output, uint32_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -849,17 +858,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_UBIGINT: {
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(uint64_t *)output = taosStr2UInt64(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(uint64_t *)output = taosStr2UInt64(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
|
||||||
*(uint64_t *)output = taosStr2UInt64(newBuf, NULL, 10);
|
convBuf[len] = 0;
|
||||||
taosMemoryFree(newBuf);
|
*(uint64_t *)output = taosStr2UInt64(convBuf, NULL, 10);
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input);
|
GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -867,17 +877,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(float *)output = taosStr2Float(varDataVal(input), NULL);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(float *)output = taosStr2Float(buf, NULL);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(float *)output = taosStr2Float(newBuf, NULL);
|
*(float *)output = taosStr2Float(convBuf, NULL);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(float *)output, float, inputType, input);
|
GET_TYPED_DATA(*(float *)output, float, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -885,17 +895,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_DOUBLE: {
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(double *)output = taosStr2Double(varDataVal(input), NULL);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(double *)output = taosStr2Double(buf, NULL);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(double *)output = taosStr2Double(newBuf, NULL);
|
*(double *)output = taosStr2Double(convBuf, NULL);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(double *)output, double, inputType, input);
|
GET_TYPED_DATA(*(double *)output, double, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -903,17 +913,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_BOOL: {
|
case TSDB_DATA_TYPE_BOOL: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
*(bool *)output = taosStr2Int8(varDataVal(input), NULL, 10);
|
memcpy(buf, varDataVal(input), varDataLen(input));
|
||||||
|
buf[varDataLen(input)] = 0;
|
||||||
|
*(bool *)output = taosStr2Int8(buf, NULL, 10);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
newBuf[len] = 0;
|
convBuf[len] = 0;
|
||||||
*(bool *)output = taosStr2Int8(newBuf, NULL, 10);
|
*(bool *)output = taosStr2Int8(convBuf, NULL, 10);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
GET_TYPED_DATA(*(bool *)output, bool, inputType, input);
|
GET_TYPED_DATA(*(bool *)output, bool, inputType, input);
|
||||||
}
|
}
|
||||||
|
@ -937,29 +947,27 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_BINARY: {
|
case TSDB_DATA_TYPE_BINARY: {
|
||||||
if (inputType == TSDB_DATA_TYPE_BOOL) {
|
if (inputType == TSDB_DATA_TYPE_BOOL) {
|
||||||
|
// NOTE: sprintf will append '\0' at the end of string
|
||||||
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
|
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
|
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
int32_t len = TMIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
|
int32_t len = TMIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
|
||||||
len = sprintf(varDataVal(output), "%.*s", len, varDataVal(input));
|
memcpy(varDataVal(output), varDataVal(input), len);
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
char *newBuf = taosMemoryCalloc(1, inputLen);
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
taosMemoryFree(newBuf);
|
code = TSDB_CODE_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
goto _end;
|
||||||
}
|
}
|
||||||
len = TMIN(len, outputLen - VARSTR_HEADER_SIZE);
|
len = TMIN(len, outputLen - VARSTR_HEADER_SIZE);
|
||||||
memcpy(varDataVal(output), newBuf, len);
|
memcpy(varDataVal(output), convBuf, len);
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
taosMemoryFree(newBuf);
|
|
||||||
} else {
|
} else {
|
||||||
char tmp[400] = {0};
|
NUM_TO_STRING(inputType, input, sizeof(buf), buf);
|
||||||
NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
|
int32_t len = (int32_t)strlen(buf);
|
||||||
int32_t len = (int32_t)strlen(tmp);
|
|
||||||
len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE);
|
len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE);
|
||||||
memcpy(varDataVal(output), tmp, len);
|
memcpy(varDataVal(output), buf, len);
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -972,14 +980,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
|
len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
|
||||||
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
return TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
|
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
|
||||||
len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
|
len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
|
||||||
bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
return TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
@ -987,38 +998,39 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
memcpy(output, input, len + VARSTR_HEADER_SIZE);
|
memcpy(output, input, len + VARSTR_HEADER_SIZE);
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
} else {
|
} else {
|
||||||
char tmp[400] = {0};
|
NUM_TO_STRING(inputType, input, sizeof(buf), buf);
|
||||||
NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
|
len = (int32_t)strlen(buf);
|
||||||
len = (int32_t)strlen(tmp);
|
|
||||||
len = outputCharLen > len ? len : outputCharLen;
|
len = outputCharLen > len ? len : outputCharLen;
|
||||||
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
bool ret = taosMbsToUcs4(buf, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
return TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
varDataSetLen(output, len);
|
varDataSetLen(output, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
//for constant conversion, need to set proper length of pOutput description
|
//for constant conversion, need to set proper length of pOutput description
|
||||||
if (len < outputLen) {
|
if (len < outputLen) {
|
||||||
pOutput->columnData->info.bytes = len + VARSTR_HEADER_SIZE;
|
pOutput->columnData->info.bytes = len + VARSTR_HEADER_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
return TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
colDataAppend(pOutput->columnData, i, output, false);
|
colDataAppend(pOutput->columnData, i, output, false);
|
||||||
if (IS_VAR_DATA_TYPE(outputType)) {
|
|
||||||
output += varDataTLen(output);
|
|
||||||
} else {
|
|
||||||
output += tDataTypes[outputType].bytes;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->numOfRows = pInput->numOfRows;
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
taosMemoryFree(outputBuf);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_end:
|
||||||
|
taosMemoryFree(output);
|
||||||
|
taosMemoryFree(convBuf);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
@ -1400,8 +1412,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { /* unix timestamp or ts column*/
|
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { /* unix timestamp or ts column*/
|
||||||
GET_TYPED_DATA(timeVal[k], int64_t, type, input[k]);
|
GET_TYPED_DATA(timeVal[k], int64_t, type, input[k]);
|
||||||
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
|
||||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
|
||||||
int64_t timeValSec = timeVal[k] / factor;
|
int64_t timeValSec = timeVal[k] / factor;
|
||||||
if (timeValSec < 1000000000) {
|
if (timeValSec < 1000000000) {
|
||||||
timeVal[k] = timeValSec;
|
timeVal[k] = timeValSec;
|
||||||
|
|
|
@ -1485,7 +1485,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
|
||||||
if (dropOfp) {
|
if (dropOfp) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
SPgno pgno = *(SPgno *)(pCell + nLocal - sizeof(SPgno));
|
SPgno pgno = *(SPgno *)(pCell + nLocal - sizeof(SPgno));
|
||||||
int nLeft = nPayload - nLocal + sizeof(SPgno);
|
int nLeft = nPayload - nLocal + sizeof(SPgno) + nHeader;
|
||||||
SPage *ofp;
|
SPage *ofp;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
||||||
|
|
|
@ -238,8 +238,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
|
// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
|
||||||
// TEST(TdbOVFLPagesTest, TbDeleteTest) {
|
TEST(TdbOVFLPagesTest, TbDeleteTest) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
taosRemoveDir("tdb");
|
taosRemoveDir("tdb");
|
||||||
|
@ -267,7 +267,8 @@ TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
|
||||||
tdbBegin(pEnv, &txn);
|
tdbBegin(pEnv, &txn);
|
||||||
|
|
||||||
// generate value payload
|
// generate value payload
|
||||||
char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
int valLen = sizeof(val) / sizeof(val[0]);
|
int valLen = sizeof(val) / sizeof(val[0]);
|
||||||
generateBigVal(val, valLen);
|
generateBigVal(val, valLen);
|
||||||
|
|
||||||
|
@ -340,8 +341,8 @@ tdbBegin(pEnv, &txn);
|
||||||
tdbTxnClose(&txn);
|
tdbTxnClose(&txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tdb_test, DISABLED_simple_insert1) {
|
// TEST(tdb_test, DISABLED_simple_insert1) {
|
||||||
// TEST(tdb_test, simple_insert1) {
|
TEST(tdb_test, simple_insert1) {
|
||||||
int ret;
|
int ret;
|
||||||
TDB *pEnv;
|
TDB *pEnv;
|
||||||
TTB *pDb;
|
TTB *pDb;
|
||||||
|
|
|
@ -150,7 +150,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
const char* idxPattern = "^[0-9]+.idx$";
|
const char* idxPattern = "^[0-9]+.idx$";
|
||||||
regex_t logRegPattern;
|
regex_t logRegPattern;
|
||||||
regex_t idxRegPattern;
|
regex_t idxRegPattern;
|
||||||
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
|
||||||
|
|
||||||
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||||
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||||
|
@ -163,6 +162,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
|
|
||||||
// scan log files and build new meta
|
// scan log files and build new meta
|
||||||
TdDirEntryPtr pDirEntry;
|
TdDirEntryPtr pDirEntry;
|
||||||
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
||||||
|
|
|
@ -267,21 +267,29 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
#ifdef TCP_KEEPCNT
|
||||||
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
|
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef TCP_KEEPIDLE
|
||||||
if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
|
if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef TCP_KEEPINTVL
|
||||||
if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
|
if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef TCP_KEEPCNT
|
||||||
if (level == SOL_TCP && optname == TCP_KEEPCNT) {
|
if (level == SOL_TCP && optname == TCP_KEEPCNT) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return setsockopt(pSocket->fd, level, optname, optval, optlen);
|
return setsockopt(pSocket->fd, level, optname, optval, optlen);
|
||||||
#else
|
#else
|
||||||
|
@ -601,26 +609,32 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
|
||||||
|
|
||||||
#ifndef __APPLE__
|
#ifndef __APPLE__
|
||||||
// all fails on macosx
|
// all fails on macosx
|
||||||
|
#ifdef TCP_KEEPCNT
|
||||||
int32_t probes = 3;
|
int32_t probes = 3;
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
|
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
|
||||||
// printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
|
// printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
|
||||||
taosCloseSocket(&pSocket);
|
taosCloseSocket(&pSocket);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef TCP_KEEPIDLE
|
||||||
int32_t alivetime = 10;
|
int32_t alivetime = 10;
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
|
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
|
||||||
// printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
|
// printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
|
||||||
taosCloseSocket(&pSocket);
|
taosCloseSocket(&pSocket);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef TCP_KEEPINTVL
|
||||||
int32_t interval = 3;
|
int32_t interval = 3;
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
|
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
|
||||||
// printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
|
// printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
|
||||||
taosCloseSocket(&pSocket);
|
taosCloseSocket(&pSocket);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#endif // __APPLE__
|
#endif // __APPLE__
|
||||||
|
|
||||||
int32_t nodelay = 1;
|
int32_t nodelay = 1;
|
||||||
|
|
|
@ -132,7 +132,7 @@ class TDTestCase:
|
||||||
tdSql.error("create mnode on dnode 2")
|
tdSql.error("create mnode on dnode 2")
|
||||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||||
print(tdSql.queryResult)
|
print(tdSql.queryResult)
|
||||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
clusterComCheck.checkDnodes(dnodeNumbers, 60)
|
||||||
|
|
||||||
# create database and stable
|
# create database and stable
|
||||||
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
||||||
|
|
|
@ -37,10 +37,10 @@ class ClusterComCheck:
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
def checkDnodes(self,dnodeNumbers):
|
def checkDnodes(self,dnodeNumbers, timeout=30):
|
||||||
count=0
|
count=0
|
||||||
# print(tdSql)
|
# print(tdSql)
|
||||||
while count < 30:
|
while count < timeout:
|
||||||
tdSql.query("select * from information_schema.ins_dnodes")
|
tdSql.query("select * from information_schema.ins_dnodes")
|
||||||
# tdLog.debug(tdSql.queryResult)
|
# tdLog.debug(tdSql.queryResult)
|
||||||
status=0
|
status=0
|
||||||
|
@ -50,14 +50,14 @@ class ClusterComCheck:
|
||||||
tdLog.info(status)
|
tdLog.info(status)
|
||||||
|
|
||||||
if status == dnodeNumbers:
|
if status == dnodeNumbers:
|
||||||
tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within 30s! " %dnodeNumbers)
|
tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! " % (dnodeNumbers, count))
|
||||||
return True
|
return True
|
||||||
count+=1
|
count+=1
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
else:
|
else:
|
||||||
tdSql.query("select * from information_schema.ins_dnodes")
|
tdSql.query("select * from information_schema.ins_dnodes")
|
||||||
tdLog.debug(tdSql.queryResult)
|
tdLog.debug(tdSql.queryResult)
|
||||||
tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within 30s ! "%dnodeNumbers)
|
tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within %ds ! "% (dnodeNumbers, timeout))
|
||||||
|
|
||||||
def checkDbRows(self,dbNumbers):
|
def checkDbRows(self,dbNumbers):
|
||||||
dbNumbers=int(dbNumbers)
|
dbNumbers=int(dbNumbers)
|
||||||
|
|
Loading…
Reference in New Issue