Merge branch '3.0' into fix/TD-16969
This commit is contained in:
commit
e8bff44bbb
|
@ -222,6 +222,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||||
|
|
||||||
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||||
|
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||||
|
|
||||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||||
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||||
|
|
|
@ -78,6 +78,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
||||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
||||||
|
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf);
|
||||||
|
|
||||||
// STRUCT =================
|
// STRUCT =================
|
||||||
struct STColumn {
|
struct STColumn {
|
||||||
|
|
|
@ -388,6 +388,10 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
|
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
|
||||||
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
|
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
|
||||||
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
|
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
|
||||||
|
//json
|
||||||
|
#define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C)
|
||||||
|
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D)
|
||||||
|
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E)
|
||||||
|
|
||||||
// grant
|
// grant
|
||||||
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
|
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
|
||||||
|
|
|
@ -716,7 +716,12 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
||||||
|
|
||||||
void* left1 = colDataGetData(pColInfoData, left);
|
void* left1 = colDataGetData(pColInfoData, left);
|
||||||
void* right1 = colDataGetData(pColInfoData, right);
|
void* right1 = colDataGetData(pColInfoData, right);
|
||||||
|
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
if (tTagIsJson(left1) || tTagIsJson(right1)) {
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
||||||
|
|
||||||
int ret = fn(left1, right1);
|
int ret = fn(left1, right1);
|
||||||
|
@ -919,6 +924,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
|
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
|
||||||
|
if(terrno) return terrno;
|
||||||
|
|
||||||
int64_t p1 = taosGetTimestampUs();
|
int64_t p1 = taosGetTimestampUs();
|
||||||
|
|
||||||
|
@ -1431,9 +1437,39 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end){
|
||||||
|
int32_t dataOffset = -1;
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
int32_t beigin = start;
|
||||||
|
while(beigin < end){
|
||||||
|
int32_t offset = pColInfoData->varmeta.offset[beigin];
|
||||||
|
if(offset == -1) {
|
||||||
|
beigin++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if(start != 0) {
|
||||||
|
pColInfoData->varmeta.offset[beigin] = dataLen;
|
||||||
|
}
|
||||||
|
char *data = pColInfoData->pData + offset;
|
||||||
|
if(dataOffset == -1) dataOffset = offset; // mark the begin of data
|
||||||
|
int32_t type = pColInfoData->info.type;
|
||||||
|
if (type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
dataLen += getJsonValueLen(data);
|
||||||
|
} else {
|
||||||
|
dataLen += varDataTLen(data);
|
||||||
|
}
|
||||||
|
beigin++;
|
||||||
|
}
|
||||||
|
if(dataOffset > 0){
|
||||||
|
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
|
||||||
|
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
|
||||||
|
}
|
||||||
|
return dataLen;
|
||||||
|
}
|
||||||
|
|
||||||
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
|
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
|
||||||
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
|
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
|
||||||
} else {
|
} else {
|
||||||
int32_t bytes = pColInfoData->info.bytes;
|
int32_t bytes = pColInfoData->info.bytes;
|
||||||
|
@ -1461,6 +1497,33 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
|
||||||
|
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
|
||||||
|
if (n == 0) {
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.rows <= n) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows = n;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||||
int64_t tbUid = pBlock->info.uid;
|
int64_t tbUid = pBlock->info.uid;
|
||||||
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
|
@ -155,7 +155,8 @@ void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uin
|
||||||
void taosVariantDestroy(SVariant *pVar) {
|
void taosVariantDestroy(SVariant *pVar) {
|
||||||
if (pVar == NULL) return;
|
if (pVar == NULL) return;
|
||||||
|
|
||||||
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR
|
||||||
|
|| pVar->nType == TSDB_DATA_TYPE_JSON) {
|
||||||
taosMemoryFreeClear(pVar->pz);
|
taosMemoryFreeClear(pVar->pz);
|
||||||
pVar->nLen = 0;
|
pVar->nLen = 0;
|
||||||
}
|
}
|
||||||
|
@ -184,7 +185,8 @@ void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) {
|
||||||
if (pSrc == NULL || pDst == NULL) return;
|
if (pSrc == NULL || pDst == NULL) return;
|
||||||
|
|
||||||
pDst->nType = pSrc->nType;
|
pDst->nType = pSrc->nType;
|
||||||
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR
|
||||||
|
|| pSrc->nType == TSDB_DATA_TYPE_JSON) {
|
||||||
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
|
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
|
||||||
char *p = taosMemoryRealloc(pDst->pz, len);
|
char *p = taosMemoryRealloc(pDst->pz, len);
|
||||||
assert(p);
|
assert(p);
|
||||||
|
@ -976,6 +978,7 @@ char *taosVariantGet(SVariant *pVar, int32_t type) {
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
return (char *)&pVar->d;
|
return (char *)&pVar->d;
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
return (char *)pVar->pz;
|
return (char *)pVar->pz;
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
return (char *)pVar->ucs4;
|
return (char *)pVar->ucs4;
|
||||||
|
|
|
@ -297,7 +297,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||||
SArray* pTagVals = NULL;
|
SArray* pTagVals = NULL;
|
||||||
STag* pTag = (STag*)pCfg->pTags;
|
STag* pTag = (STag*)pCfg->pTags;
|
||||||
|
|
||||||
if (pCfg->pTags && pTag->flags & TD_TAG_JSON) {
|
if (pCfg->pTags && tTagIsJson(pTag)) {
|
||||||
char* pJson = parseTagDatatoJson(pTag);
|
char* pJson = parseTagDatatoJson(pTag);
|
||||||
if (pJson) {
|
if (pJson) {
|
||||||
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
||||||
|
|
|
@ -63,6 +63,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
rowSize += pCtx[i].resDataInfo.interBufSize;
|
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rowSize += (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
||||||
return rowSize;
|
return rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +113,9 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
|
||||||
p->groupId = *(uint64_t*)key;
|
p->groupId = *(uint64_t*)key;
|
||||||
p->pos = *(SResultRowPosition*)pData;
|
p->pos = *(SResultRowPosition*)pData;
|
||||||
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
|
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_groupRes, groupId:%"PRIu64",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
|
||||||
|
#endif
|
||||||
taosArrayPush(pGroupResInfo->pRows, &p);
|
taosArrayPush(pGroupResInfo->pRows, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,6 +274,7 @@ static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
|
||||||
SNode* pNew = NULL;
|
SNode* pNew = NULL;
|
||||||
int32_t code = scalarCalculateConstants(pTagCondTmp, &pNew);
|
int32_t code = scalarCalculateConstants(pTagCondTmp, &pNew);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
terrno = code;
|
||||||
nodesDestroyNode(pTagCondTmp);
|
nodesDestroyNode(pTagCondTmp);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -323,11 +327,18 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
||||||
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if(pTagCond){
|
if(pTagCond){
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
||||||
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
||||||
bool isOk = isTableOk(info, pTagCond, metaHandle);
|
bool isOk = isTableOk(info, pTagCond, metaHandle);
|
||||||
|
if(terrno) return terrno;
|
||||||
if(!isOk){
|
if(!isOk){
|
||||||
taosArrayRemove(pListInfo->pTableList, i);
|
taosArrayRemove(pListInfo->pTableList, i);
|
||||||
continue;
|
continue;
|
||||||
|
@ -586,13 +597,16 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0 ||
|
||||||
|
strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
|
||||||
pValCtx[num++] = &pCtx[i];
|
pValCtx[num++] = &pCtx[i];
|
||||||
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
||||||
p = &pCtx[i];
|
p = &pCtx[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_setSelect num:%d", num);
|
||||||
|
#endif
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
p->subsidiaries.pCtx = pValCtx;
|
p->subsidiaries.pCtx = pValCtx;
|
||||||
p->subsidiaries.num = num;
|
p->subsidiaries.num = num;
|
||||||
|
|
|
@ -274,6 +274,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
|
|
||||||
// 1. close current opened time window
|
// 1. close current opened time window
|
||||||
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
|
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_1");
|
||||||
|
#endif
|
||||||
SResultRowPosition pos = pResultRowInfo->cur;
|
SResultRowPosition pos = pResultRowInfo->cur;
|
||||||
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||||
releaseBufPage(pResultBuf, pPage);
|
releaseBufPage(pResultBuf, pPage);
|
||||||
|
@ -281,6 +284,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
|
|
||||||
// allocate a new buffer page
|
// allocate a new buffer page
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_2");
|
||||||
|
#endif
|
||||||
ASSERT(pSup->resultRowSize > 0);
|
ASSERT(pSup->resultRowSize > 0);
|
||||||
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
|
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
|
||||||
|
|
||||||
|
@ -538,7 +544,9 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
|
||||||
if (pCtx[k].fpSet.process == NULL) {
|
if (pCtx[k].fpSet.process == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_process");
|
||||||
|
#endif
|
||||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||||
|
@ -1413,7 +1421,9 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
|
||||||
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_setbuf, groupId:%"PRIu64, groupId);
|
||||||
|
#endif
|
||||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||||
|
|
||||||
// record the current active group id
|
// record the current active group id
|
||||||
|
@ -1489,11 +1499,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
int32_t numOfExprs) {
|
int32_t numOfExprs) {
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t start = pGroupResInfo->index;
|
int32_t start = pGroupResInfo->index;
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("\npage_copytoblock rows:%d", numOfRows);
|
||||||
|
#endif
|
||||||
for (int32_t i = start; i < numOfRows; i += 1) {
|
for (int32_t i = start; i < numOfRows; i += 1) {
|
||||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
|
||||||
|
#endif
|
||||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||||
|
|
||||||
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
||||||
|
@ -1525,6 +1539,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
|
|
||||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
|
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
|
||||||
if (pCtx[j].fpSet.finalize) {
|
if (pCtx[j].fpSet.finalize) {
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("\npage_finalize %d", numOfExprs);
|
||||||
|
#endif
|
||||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
if (TAOS_FAILED(code)) {
|
if (TAOS_FAILED(code)) {
|
||||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
|
@ -1553,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
@ -2373,8 +2390,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
|
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
|
||||||
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
|
if (pInfo->pSources == NULL) {
|
||||||
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3165,8 +3181,9 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for the limitation in each group
|
// check for the limitation in each group
|
||||||
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
|
if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
|
||||||
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
||||||
|
blockDataKeepFirstNRows(pRes, keepRows);
|
||||||
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
|
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -3480,11 +3497,12 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
|
||||||
}
|
}
|
||||||
|
|
||||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||||
|
ASSERT(numOfRows != 0);
|
||||||
pOperator->resultInfo.capacity = numOfRows;
|
pOperator->resultInfo.capacity = numOfRows;
|
||||||
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
||||||
|
|
||||||
if (pOperator->resultInfo.threshold == 0) {
|
if (pOperator->resultInfo.threshold == 0) {
|
||||||
pOperator->resultInfo.capacity = numOfRows;
|
pOperator->resultInfo.threshold = numOfRows;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3984,12 +4002,12 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (taosArrayPush(sortSupport, groupId) != NULL) {
|
if (taosArrayPush(sortSupport, groupId) == NULL) {
|
||||||
qError("taos push support array error");
|
qError("taos push support array error");
|
||||||
taosArrayDestroy(sortSupport);
|
taosArrayDestroy(sortSupport);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL) {
|
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
|
||||||
qError("taos push group array error");
|
qError("taos push group array error");
|
||||||
taosArrayDestroy(sortSupport);
|
taosArrayDestroy(sortSupport);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
@ -4069,6 +4087,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(keyBuf);
|
taosMemoryFree(keyBuf);
|
||||||
nodesClearList(groupNew);
|
nodesClearList(groupNew);
|
||||||
|
metaReaderClear(&mr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4082,6 +4101,13 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
isNull[index++] = 0;
|
isNull[index++] = 0;
|
||||||
char* data = nodesGetValueFromNode(pValue);
|
char* data = nodesGetValueFromNode(pValue);
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
||||||
|
if(tTagIsJson(data)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
|
taosMemoryFree(keyBuf);
|
||||||
|
nodesClearList(groupNew);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
int32_t len = getJsonValueLen(data);
|
int32_t len = getJsonValueLen(data);
|
||||||
memcpy(pStart, data, len);
|
memcpy(pStart, data, len);
|
||||||
pStart += len;
|
pStart += len;
|
||||||
|
@ -4141,6 +4167,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if(code){
|
if(code){
|
||||||
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||||
|
@ -4167,7 +4194,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
.maxTs = INT64_MIN,
|
.maxTs = INT64_MIN,
|
||||||
};
|
};
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
|
if(code){
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator =
|
||||||
|
|
|
@ -141,6 +141,10 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
||||||
pkey->isNull = false;
|
pkey->isNull = false;
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
if(tTagIsJson(val)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
|
return;
|
||||||
|
}
|
||||||
int32_t dataLen = getJsonValueLen(val);
|
int32_t dataLen = getJsonValueLen(val);
|
||||||
memcpy(pkey->pData, val, dataLen);
|
memcpy(pkey->pData, val, dataLen);
|
||||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
@ -227,11 +231,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||||
if (!pInfo->isInit) {
|
if (!pInfo->isInit) {
|
||||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
pInfo->isInit = true;
|
pInfo->isInit = true;
|
||||||
num++;
|
num++;
|
||||||
continue;
|
continue;
|
||||||
|
@ -247,6 +255,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
if (j == 0) {
|
if (j == 0) {
|
||||||
num++;
|
num++;
|
||||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,7 +672,11 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
doHashPartition(pOperator, pBlock);
|
doHashPartition(pOperator, pBlock);
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
||||||
|
|
|
@ -2090,6 +2090,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
||||||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -593,7 +593,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (size > sortBufSize) {
|
if (size > sortBufSize) {
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
int64_t p = taosGetTimestampUs();
|
int64_t p = taosGetTimestampUs();
|
||||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - p;
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
pHandle->sortElapsed += el;
|
pHandle->sortElapsed += el;
|
||||||
|
@ -608,7 +611,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
int64_t p = taosGetTimestampUs();
|
int64_t p = taosGetTimestampUs();
|
||||||
|
|
||||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - p;
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
pHandle->sortElapsed += el;
|
pHandle->sortElapsed += el;
|
||||||
|
|
|
@ -1553,7 +1553,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BINARY].bytes, .type = TSDB_DATA_TYPE_BINARY};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes, .type = TSDB_DATA_TYPE_JSON};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2630,7 +2630,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "_group_key",
|
.name = "_group_key",
|
||||||
.type = FUNCTION_TYPE_GROUP_KEY,
|
.type = FUNCTION_TYPE_GROUP_KEY,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
||||||
.translateFunc = translateGroupKey,
|
.translateFunc = translateGroupKey,
|
||||||
.getEnvFunc = getGroupKeyFuncEnv,
|
.getEnvFunc = getGroupKeyFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "thistogram.h"
|
#include "thistogram.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
#define MAVG_MAX_POINTS_NUM 1000
|
#define MAVG_MAX_POINTS_NUM 1000
|
||||||
|
@ -1472,8 +1473,8 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
||||||
int32_t pageId = pTuplePos->pageId;
|
int32_t pageId = pTuplePos->pageId;
|
||||||
int32_t offset = pTuplePos->offset;
|
int32_t offset = pTuplePos->offset;
|
||||||
|
|
||||||
if (pTuplePos->pageId != -1) {
|
if (pTuplePos->pageId != -1 && pCtx->subsidiaries.num > 0) {
|
||||||
int32_t numOfCols = taosArrayGetSize(pCtx->pSrcBlock->pDataBlock);
|
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||||
|
|
||||||
bool* nullList = (bool*)((char*)pPage + offset);
|
bool* nullList = (bool*)((char*)pPage + offset);
|
||||||
|
@ -1484,22 +1485,21 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
|
||||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
|
||||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
int32_t ps = 0;
|
int32_t ps = 0;
|
||||||
for (int32_t k = 0; k < srcSlotId; ++k) {
|
|
||||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
|
||||||
ps += pSrcCol->info.bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
if (nullList[srcSlotId]) {
|
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||||
|
if (nullList[j]) {
|
||||||
colDataAppendNULL(pDstCol, rowIndex);
|
colDataAppendNULL(pDstCol, rowIndex);
|
||||||
} else {
|
} else {
|
||||||
colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
|
colDataAppend(pDstCol, rowIndex, pStart, false);
|
||||||
}
|
}
|
||||||
|
pStart += pDstCol->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3208,7 +3208,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||||
|
#endif
|
||||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||||
pEntryInfo->numOfRes++;
|
pEntryInfo->numOfRes++;
|
||||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||||
|
@ -3229,7 +3231,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||||
|
#endif
|
||||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||||
topBotResComparFn, NULL, !isTopQuery);
|
topBotResComparFn, NULL, !isTopQuery);
|
||||||
}
|
}
|
||||||
|
@ -3239,7 +3243,11 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = NULL;
|
SFilePage* pPage = NULL;
|
||||||
|
|
||||||
int32_t completeRowSize = pSrcBlock->info.rowSize + (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(bool);
|
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
|
||||||
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
completeRowSize += pc->pExpr->base.resSchema.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
if (pCtx->curBufPage == -1) {
|
if (pCtx->curBufPage == -1) {
|
||||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||||
|
@ -3257,19 +3265,22 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
// keep the current row data, extract method
|
// keep the current row data, extract method
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||||
char* pStart = (char*)(nullList + sizeof(bool) * (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock));
|
char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num);
|
||||||
for (int32_t i = 0; i < (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock); ++i) {
|
for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
|
||||||
if (isNull) {
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
nullList[i] = true;
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||||
|
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||||
offset += pCol->info.bytes;
|
offset += pCol->info.bytes;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = colDataGetData(pCol, rowIndex);
|
char* p = colDataGetData(pCol, rowIndex);
|
||||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
memcpy(pStart + offset, p, varDataTLen(p));
|
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||||
}
|
}
|
||||||
|
@ -3287,14 +3298,18 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||||
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
|
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||||
|
|
||||||
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
||||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||||
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||||
offset += pCol->info.bytes;
|
offset += pCol->info.bytes;
|
||||||
continue;
|
continue;
|
||||||
|
@ -3302,7 +3317,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
|
|
||||||
char* p = colDataGetData(pCol, rowIndex);
|
char* p = colDataGetData(pCol, rowIndex);
|
||||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
memcpy(pStart + offset, p, varDataTLen(p));
|
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||||
}
|
}
|
||||||
|
@ -3316,7 +3331,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
|
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
|
||||||
int16_t type = pCtx->input.pData[0]->info.type;
|
int16_t type = pCtx->input.pData[0]->info.type;
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
@ -3333,7 +3348,9 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
} else {
|
} else {
|
||||||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||||
|
#endif
|
||||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||||
currentRow += 1;
|
currentRow += 1;
|
||||||
}
|
}
|
||||||
|
@ -5629,8 +5646,6 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
|
||||||
|
|
||||||
int32_t startIndex = pInput->startRowIndex;
|
int32_t startIndex = pInput->startRowIndex;
|
||||||
|
|
||||||
//escape rest of data blocks to avoid first entry to be overwritten.
|
//escape rest of data blocks to avoid first entry to be overwritten.
|
||||||
|
@ -5645,7 +5660,11 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, startIndex);
|
char* data = colDataGetData(pInputCol, startIndex);
|
||||||
memcpy(pInfo->data, data, bytes);
|
if (IS_VAR_DATA_TYPE(pInputCol->info.type)) {
|
||||||
|
memcpy(pInfo->data, data, (pInputCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(data): varDataTLen(data));
|
||||||
|
} else {
|
||||||
|
memcpy(pInfo->data, data, pInputCol->info.bytes);
|
||||||
|
}
|
||||||
pInfo->hasResult = true;
|
pInfo->hasResult = true;
|
||||||
|
|
||||||
_group_key_over:
|
_group_key_over:
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
#define COPY_SCALAR_FIELD(fldname) \
|
#define COPY_SCALAR_FIELD(fldname) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -164,7 +165,15 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
|
||||||
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:{
|
||||||
|
int32_t len = getJsonValueLen(pSrc->datum.p);
|
||||||
|
pDst->datum.p = taosMemoryCalloc(1, len);
|
||||||
|
if (NULL == pDst->datum.p) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
static int32_t nodeToJson(const void* pObj, SJson* pJson);
|
static int32_t nodeToJson(const void* pObj, SJson* pJson);
|
||||||
static int32_t jsonToNode(const SJson* pJson, void* pObj);
|
static int32_t jsonToNode(const SJson* pJson, void* pObj);
|
||||||
|
@ -2629,7 +2630,18 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
|
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:{
|
||||||
|
int32_t len = getJsonValueLen(pNode->datum.p);
|
||||||
|
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char));
|
||||||
|
code = taosHexEncode(pNode->datum.p, buf, len);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
code = tjsonAddStringToObject(pJson, jkValueDatum, buf);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
// todo
|
// todo
|
||||||
|
@ -2752,7 +2764,30 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:{
|
||||||
|
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
|
||||||
|
if (NULL == pNode->datum.p) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
char* buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + 1);
|
||||||
|
if (NULL == buf) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
code = tjsonGetStringValue(pJson, jkValueDatum, buf);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
code = taosHexDecode(buf, pNode->datum.p, pNode->node.resType.bytes);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
// todo
|
// todo
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
static SNode* makeNode(ENodeType type, size_t size) {
|
static SNode* makeNode(ENodeType type, size_t size) {
|
||||||
SNode* p = taosMemoryCalloc(1, size);
|
SNode* p = taosMemoryCalloc(1, size);
|
||||||
|
@ -1675,6 +1676,10 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||||
pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0;
|
pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
pVal->nLen = getJsonValueLen(pNode->datum.p);
|
||||||
|
pVal->pz = taosMemoryMalloc(pVal->nLen);
|
||||||
|
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||||
|
break;
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
// todo
|
// todo
|
||||||
|
|
|
@ -62,7 +62,6 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
|
||||||
int32_t getNumOfTags(const STableMeta* pTableMeta);
|
int32_t getNumOfTags(const STableMeta* pTableMeta);
|
||||||
STableComInfo getTableInfo(const STableMeta* pTableMeta);
|
STableComInfo getTableInfo(const STableMeta* pTableMeta);
|
||||||
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
||||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf);
|
|
||||||
|
|
||||||
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
||||||
|
|
||||||
|
|
|
@ -219,6 +219,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
|
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
|
||||||
|
if(pBuf == NULL) return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||||
const char* msgFormat1 = "syntax error near \'%s\'";
|
const char* msgFormat1 = "syntax error near \'%s\'";
|
||||||
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
|
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
|
||||||
const char* msgFormat3 = "%s";
|
const char* msgFormat3 = "%s";
|
||||||
|
@ -346,7 +347,7 @@ static bool isValidateTag(char* input) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf) {
|
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf) {
|
||||||
int32_t retCode = TSDB_CODE_SUCCESS;
|
int32_t retCode = TSDB_CODE_SUCCESS;
|
||||||
cJSON* root = NULL;
|
cJSON* root = NULL;
|
||||||
SHashObj* keyHash = NULL;
|
SHashObj* keyHash = NULL;
|
||||||
|
|
|
@ -305,18 +305,21 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
||||||
|
|
||||||
char* parseTagDatatoJson(void* p) {
|
char* parseTagDatatoJson(void* p) {
|
||||||
char* string = NULL;
|
char* string = NULL;
|
||||||
cJSON* json = cJSON_CreateObject();
|
|
||||||
if (json == NULL) {
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pTagVals = NULL;
|
SArray* pTagVals = NULL;
|
||||||
|
cJSON* json = NULL;
|
||||||
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
|
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t nCols = taosArrayGetSize(pTagVals);
|
int16_t nCols = taosArrayGetSize(pTagVals);
|
||||||
|
if (nCols == 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
char tagJsonKey[256] = {0};
|
char tagJsonKey[256] = {0};
|
||||||
|
json = cJSON_CreateObject();
|
||||||
|
if (json == NULL) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
for (int j = 0; j < nCols; ++j) {
|
for (int j = 0; j < nCols; ++j) {
|
||||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
||||||
// json key encode by binary
|
// json key encode by binary
|
||||||
|
@ -374,6 +377,10 @@ char* parseTagDatatoJson(void* p) {
|
||||||
string = cJSON_PrintUnformatted(json);
|
string = cJSON_PrintUnformatted(json);
|
||||||
end:
|
end:
|
||||||
cJSON_Delete(json);
|
cJSON_Delete(json);
|
||||||
|
taosArrayDestroy(pTagVals);
|
||||||
|
if(string == NULL){
|
||||||
|
string = strdup(TSDB_DATA_NULL_STR_L);
|
||||||
|
}
|
||||||
return string;
|
return string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,6 +192,9 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
return 18;
|
return 18;
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
|
||||||
|
return 0;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
@ -215,6 +218,9 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
return 24;
|
return 24;
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
|
||||||
|
return 0;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -551,7 +551,9 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
SScalarParam* pLeft = ¶ms[0];
|
SScalarParam* pLeft = ¶ms[0];
|
||||||
SScalarParam* pRight = paramNum > 1 ? ¶ms[1] : NULL;
|
SScalarParam* pRight = paramNum > 1 ? ¶ms[1] : NULL;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
|
OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
|
||||||
|
code = terrno;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
for (int32_t i = 0; i < paramNum; ++i) {
|
for (int32_t i = 0; i < paramNum; ++i) {
|
||||||
|
@ -693,7 +695,11 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
res->node.resType.scale = output.columnData->info.scale;
|
res->node.resType.scale = output.columnData->info.scale;
|
||||||
res->node.resType.precision = output.columnData->info.precision;
|
res->node.resType.precision = output.columnData->info.precision;
|
||||||
int32_t type = output.columnData->info.type;
|
int32_t type = output.columnData->info.type;
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (type == TSDB_DATA_TYPE_JSON){
|
||||||
|
int32_t len = getJsonValueLen(output.columnData->pData);
|
||||||
|
res->datum.p = taosMemoryCalloc(len, 1);
|
||||||
|
memcpy(res->datum.p, output.columnData->pData, len);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(type)) {
|
||||||
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||||
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1152,42 +1152,30 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
|
|
||||||
char tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
|
char tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
|
||||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
SArray* pTagVals = taosArrayInit(8, sizeof(STagVal));
|
||||||
colDataAppendNULL(pOutput->columnData, i);
|
STag* pTag = NULL;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[i];
|
|
||||||
|
|
||||||
if(type == TSDB_DATA_TYPE_NCHAR){
|
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||||
if (varDataTLen(input) > TSDB_MAX_JSON_TAG_LEN){
|
tTagNew(pTagVals, 1, true, &pTag);
|
||||||
colDataAppendNULL(pOutput->columnData, i);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), tmp);
|
|
||||||
if (len < 0) {
|
|
||||||
colDataAppendNULL(pOutput->columnData, i);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
tmp[len] = 0;
|
|
||||||
}else{
|
}else{
|
||||||
|
char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[i];
|
||||||
if (varDataLen(input) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
if (varDataLen(input) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
||||||
colDataAppendNULL(pOutput->columnData, i);
|
taosArrayDestroy(pTagVals);
|
||||||
continue;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
memcpy(tmp, varDataVal(input), varDataLen(input));
|
memcpy(tmp, varDataVal(input), varDataLen(input));
|
||||||
tmp[varDataLen(input)] = 0;
|
tmp[varDataLen(input)] = 0;
|
||||||
|
if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL)){
|
||||||
|
tTagNew(pTagVals, 1, true, &pTag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!tjsonValidateJson(tmp)){
|
colDataAppend(pOutput->columnData, i, (const char*)pTag, false);
|
||||||
colDataAppendNULL(pOutput->columnData, i);
|
tTagFree(pTag);
|
||||||
continue;
|
taosArrayDestroy(pTagVals);
|
||||||
}
|
|
||||||
|
|
||||||
colDataAppend(pOutput->columnData, i, input, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->numOfRows = pInput->numOfRows;
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,6 +160,9 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){
|
||||||
return 0;
|
return 0;
|
||||||
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
||||||
convertNcharToDouble(data+CHAR_BYTES, &out);
|
convertNcharToDouble(data+CHAR_BYTES, &out);
|
||||||
|
} else if(tTagIsJson(data)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
||||||
}
|
}
|
||||||
|
@ -416,6 +419,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
||||||
else if(*data == TSDB_DATA_TYPE_NCHAR) {
|
else if(*data == TSDB_DATA_TYPE_NCHAR) {
|
||||||
data += CHAR_BYTES;
|
data += CHAR_BYTES;
|
||||||
convertType = TSDB_DATA_TYPE_NCHAR;
|
convertType = TSDB_DATA_TYPE_NCHAR;
|
||||||
|
} else if(tTagIsJson(data)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
convertNumberToNumber(data+CHAR_BYTES, colDataGetNumData(pOut->columnData, i), *data, outType);
|
convertNumberToNumber(data+CHAR_BYTES, colDataGetNumData(pOut->columnData, i), *data, outType);
|
||||||
continue;
|
continue;
|
||||||
|
@ -461,6 +467,9 @@ double getVectorDoubleValue_JSON(void *src, int32_t index){
|
||||||
return out;
|
return out;
|
||||||
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
||||||
convertNcharToDouble(data+CHAR_BYTES, &out);
|
convertNcharToDouble(data+CHAR_BYTES, &out);
|
||||||
|
} else if(tTagIsJson(data)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return 0;
|
||||||
} else{
|
} else{
|
||||||
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
||||||
}
|
}
|
||||||
|
@ -493,10 +502,18 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
if(typeLeft == TSDB_DATA_TYPE_JSON){
|
if(typeLeft == TSDB_DATA_TYPE_JSON){
|
||||||
|
if(tTagIsJson(*pLeftData)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
typeLeft = **pLeftData;
|
typeLeft = **pLeftData;
|
||||||
(*pLeftData) ++;
|
(*pLeftData) ++;
|
||||||
}
|
}
|
||||||
if(typeRight == TSDB_DATA_TYPE_JSON){
|
if(typeRight == TSDB_DATA_TYPE_JSON){
|
||||||
|
if(tTagIsJson(*pLeftData)){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
typeRight = **pRightData;
|
typeRight = **pRightData;
|
||||||
(*pRightData) ++;
|
(*pRightData) ++;
|
||||||
}
|
}
|
||||||
|
@ -1576,7 +1593,11 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
||||||
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
||||||
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
||||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||||
|
|
||||||
__compar_fn_t fp = filterGetCompFunc(GET_PARAM_TYPE(pLeft), optr);
|
__compar_fn_t fp = filterGetCompFunc(GET_PARAM_TYPE(pLeft), optr);
|
||||||
|
if(terrno != TSDB_CODE_SUCCESS){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
|
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
|
||||||
|
|
||||||
|
@ -1709,6 +1730,7 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
||||||
STagVal getJsonValue(char *json, char *key, bool *isExist) {
|
STagVal getJsonValue(char *json, char *key, bool *isExist) {
|
||||||
STagVal val = {.pKey = key};
|
STagVal val = {.pKey = key};
|
||||||
if (tTagIsJson((const STag *)json) == false){
|
if (tTagIsJson((const STag *)json) == false){
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||||
if(isExist){
|
if(isExist){
|
||||||
*isExist = false;
|
*isExist = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,7 +202,7 @@ int32_t taosHexEncode(const char *src, char *dst, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < len; ++i) {
|
for (int32_t i = 0; i < len; ++i) {
|
||||||
sprintf(dst + i * 2, "%02x", src[i] & 0xff);
|
sprintf(dst + i * 2, "%02x", src[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -213,10 +213,10 @@ int32_t taosHexDecode(const char *src, char *dst, int32_t len) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t hn, ln, out;
|
uint8_t hn, ln, out;
|
||||||
for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) {
|
for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) {
|
||||||
hn = src[i] > '9' ? src[i] - 'A' + 10 : src[i] - '0';
|
hn = src[i] > '9' ? src[i] - 'a' + 10 : src[i] - '0';
|
||||||
ln = src[i + 1] > '9' ? src[i + 1] - 'A' + 10 : src[i + 1] - '0';
|
ln = src[i + 1] > '9' ? src[i + 1] - 'a' + 10 : src[i + 1] - '0';
|
||||||
|
|
||||||
out = (hn << 4) | ln;
|
out = (hn << 4) | ln;
|
||||||
memcpy(dst + j, &out, 1);
|
memcpy(dst + j, &out, 1);
|
||||||
|
|
|
@ -394,6 +394,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in in/notin operator")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by")
|
||||||
|
|
||||||
// grant
|
// grant
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
|
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
|
||||||
|
|
|
@ -193,7 +193,9 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
||||||
|
|
||||||
char* pDataBuf = pg->pData;
|
char* pDataBuf = pg->pData;
|
||||||
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
|
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
|
||||||
|
#endif
|
||||||
pg->length = size; // on disk size
|
pg->length = size; // on disk size
|
||||||
return pDataBuf;
|
return pDataBuf;
|
||||||
}
|
}
|
||||||
|
@ -440,6 +442,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
((void**)pi->pData)[0] = pi;
|
((void**)pi->pData)[0] = pi;
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%"PRId64, pi->pData, pi->pageId, pi->offset);
|
||||||
|
#endif
|
||||||
return (void*)(GET_DATA_PAYLOAD(pi));
|
return (void*)(GET_DATA_PAYLOAD(pi));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,7 +467,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
|
|
||||||
lruListMoveToFront(pBuf->lruList, (*pi));
|
lruListMoveToFront(pBuf->lruList, (*pi));
|
||||||
(*pi)->used = true;
|
(*pi)->used = true;
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
uDebug("page_getBufPage1 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||||
|
#endif
|
||||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||||
} else { // not in memory
|
} else { // not in memory
|
||||||
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||||
|
@ -494,7 +501,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
uDebug("page_getBufPage2 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||||
|
#endif
|
||||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -506,8 +515,11 @@ void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
|
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
|
||||||
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
|
||||||
|
#endif
|
||||||
assert(pi->pData != NULL && pi->used == true);
|
assert(pi->pData != NULL && pi->used == true);
|
||||||
|
// assert(pi->pData != NULL);
|
||||||
pi->used = false;
|
pi->used = false;
|
||||||
pBuf->statis.releasePages += 1;
|
pBuf->statis.releasePages += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,10 @@ class TDTestCase:
|
||||||
tdSql.init(conn.cursor(), logSql)
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
# tdSql.prepare()
|
||||||
|
tdSql.execute('drop database if exists db')
|
||||||
|
tdSql.execute('create database db vgroups 1')
|
||||||
|
tdSql.execute('use db')
|
||||||
print("============== STEP 1 ===== prepare data & validate json string")
|
print("============== STEP 1 ===== prepare data & validate json string")
|
||||||
tdSql.error("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json, tagint int)")
|
tdSql.error("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json, tagint int)")
|
||||||
tdSql.error("create table if not exists jsons1(ts timestamp, data json) tags(tagint int)")
|
tdSql.error("create table if not exists jsons1(ts timestamp, data json) tags(tagint int)")
|
||||||
|
@ -56,6 +59,22 @@ class TDTestCase:
|
||||||
tdSql.query("select jtag from jsons1_8")
|
tdSql.query("select jtag from jsons1_8")
|
||||||
tdSql.checkData(0, 0, '{" ":90,"1tag$":2,"tag1":null}')
|
tdSql.checkData(0, 0, '{" ":90,"1tag$":2,"tag1":null}')
|
||||||
|
|
||||||
|
tdSql.query("select ts,jtag from jsons1 order by ts limit 2,3")
|
||||||
|
tdSql.checkData(0, 0, '2020-06-02 09:17:08.000')
|
||||||
|
tdSql.checkData(0, 1, '{"tag1":5,"tag2":"beijing"}')
|
||||||
|
tdSql.checkData(1, 0, '2020-06-02 09:17:48.000')
|
||||||
|
tdSql.checkData(1, 1, '{"tag1":false,"tag2":"beijing"}')
|
||||||
|
tdSql.checkData(2, 0, '2020-06-02 09:18:48.000')
|
||||||
|
tdSql.checkData(2, 1, '{"tag1":null,"tag2":"shanghai","tag3":"hello"}')
|
||||||
|
|
||||||
|
tdSql.query("select ts,jtag->'tag1' from jsons1 order by ts limit 2,3")
|
||||||
|
tdSql.checkData(0, 0, '2020-06-02 09:17:08.000')
|
||||||
|
tdSql.checkData(0, 1, '5.000000000')
|
||||||
|
tdSql.checkData(1, 0, '2020-06-02 09:17:48.000')
|
||||||
|
tdSql.checkData(1, 1, 'false')
|
||||||
|
tdSql.checkData(2, 0, '2020-06-02 09:18:48.000')
|
||||||
|
tdSql.checkData(2, 1, 'null')
|
||||||
|
|
||||||
# test empty json string, save as jtag is NULL
|
# test empty json string, save as jtag is NULL
|
||||||
tdSql.execute("insert into jsons1_9 using jsons1 tags('\t') values (1591062328000, 24, NULL, '你就会', '2sdw')")
|
tdSql.execute("insert into jsons1_9 using jsons1 tags('\t') values (1591062328000, 24, NULL, '你就会', '2sdw')")
|
||||||
tdSql.execute("CREATE TABLE if not exists jsons1_10 using jsons1 tags('')")
|
tdSql.execute("CREATE TABLE if not exists jsons1_10 using jsons1 tags('')")
|
||||||
|
@ -218,9 +237,19 @@ class TDTestCase:
|
||||||
|
|
||||||
# test where with json tag
|
# test where with json tag
|
||||||
tdSql.query("select * from jsons1_1 where jtag is not null")
|
tdSql.query("select * from jsons1_1 where jtag is not null")
|
||||||
# tdSql.query("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
tdSql.error("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
||||||
tdSql.error("select * from jsons1 where jtag->'tag1'={}")
|
tdSql.error("select * from jsons1 where jtag->'tag1'={}")
|
||||||
|
|
||||||
|
# test json error
|
||||||
|
tdSql.error("select jtag + 1 from jsons1")
|
||||||
|
tdSql.error("select jtag > 1 from jsons1")
|
||||||
|
tdSql.error("select jtag like \"1\" from jsons1")
|
||||||
|
tdSql.error("select jtag in (\"1\") from jsons1")
|
||||||
|
tdSql.error("select jtag from jsons1 where jtag > 1")
|
||||||
|
tdSql.error("select jtag from jsons1 where jtag like 'fsss'")
|
||||||
|
tdSql.error("select jtag from jsons1 where jtag in (1)")
|
||||||
|
|
||||||
|
|
||||||
# where json value is string
|
# where json value is string
|
||||||
tdSql.query("select * from jsons1 where jtag->'tag2'='beijing'")
|
tdSql.query("select * from jsons1 where jtag->'tag2'='beijing'")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
@ -369,7 +398,7 @@ class TDTestCase:
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
||||||
# test where condition in no support in
|
# test where condition in no support in
|
||||||
# tdSql.error("select * from jsons1 where jtag->'tag1' in ('beijing')")
|
tdSql.error("select * from jsons1 where jtag->'tag1' in ('beijing')")
|
||||||
|
|
||||||
# test where condition match/nmath
|
# test where condition match/nmath
|
||||||
tdSql.query("select * from jsons1 where jtag->'tag1' match 'ma'")
|
tdSql.query("select * from jsons1 where jtag->'tag1' match 'ma'")
|
||||||
|
@ -387,8 +416,8 @@ class TDTestCase:
|
||||||
tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
||||||
tdSql.query("select distinct jtag->'tag1' from jsons1")
|
tdSql.query("select distinct jtag->'tag1' from jsons1")
|
||||||
tdSql.checkRows(8)
|
tdSql.checkRows(8)
|
||||||
tdSql.query("select distinct jtag from jsons1")
|
# tdSql.query("select distinct jtag from jsons1")
|
||||||
tdSql.checkRows(9)
|
# tdSql.checkRows(9)
|
||||||
|
|
||||||
#test dumplicate key with normal colomn
|
#test dumplicate key with normal colomn
|
||||||
tdSql.execute("INSERT INTO jsons1_15 using jsons1 tags('{\"tbname\":\"tt\",\"databool\":true,\"datastr\":\"是是是\"}') values(1591060828000, 4, false, 'jjsf', \"你就会\")")
|
tdSql.execute("INSERT INTO jsons1_15 using jsons1 tags('{\"tbname\":\"tt\",\"databool\":true,\"datastr\":\"是是是\"}') values(1591060828000, 4, false, 'jjsf', \"你就会\")")
|
||||||
|
@ -424,62 +453,56 @@ class TDTestCase:
|
||||||
tdSql.checkData(7, 1, "false")
|
tdSql.checkData(7, 1, "false")
|
||||||
|
|
||||||
|
|
||||||
# tdSql.error("select count(*) from jsons1 group by jtag")
|
tdSql.error("select count(*) from jsons1 group by jtag")
|
||||||
# tdSql.error("select count(*) from jsons1 partition by jtag")
|
tdSql.error("select count(*) from jsons1 partition by jtag")
|
||||||
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
||||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
|
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
|
||||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
|
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
|
||||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
||||||
# tdSql.checkRows(8)
|
tdSql.checkRows(8)
|
||||||
# tdSql.checkData(0, 0, 2)
|
tdSql.checkData(0, 0, 2)
|
||||||
# tdSql.checkData(0, 1, '"femail"')
|
tdSql.checkData(0, 1, '"femail"')
|
||||||
# tdSql.checkData(1, 0, 2)
|
tdSql.checkData(1, 0, 2)
|
||||||
# tdSql.checkData(1, 1, '"收到货"')
|
tdSql.checkData(1, 1, '"收到货"')
|
||||||
# tdSql.checkData(2, 0, 1)
|
tdSql.checkData(2, 0, 1)
|
||||||
# tdSql.checkData(2, 1, "11.000000000")
|
tdSql.checkData(2, 1, "11.000000000")
|
||||||
# tdSql.checkData(5, 0, 1)
|
tdSql.checkData(5, 0, 1)
|
||||||
# tdSql.checkData(5, 1, "false")
|
tdSql.checkData(5, 1, "false")
|
||||||
# tdSql.checkData(6, 0, 1)
|
|
||||||
# tdSql.checkData(6, 1, "null")
|
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
||||||
# tdSql.checkData(7, 0, 2)
|
tdSql.checkRows(8)
|
||||||
# tdSql.checkData(7, 1, None)
|
tdSql.checkData(0, 1, None)
|
||||||
|
tdSql.checkData(2, 0, 1)
|
||||||
|
tdSql.checkData(2, 1, "false")
|
||||||
|
tdSql.checkData(5, 0, 1)
|
||||||
|
tdSql.checkData(5, 1, "11.000000000")
|
||||||
|
tdSql.checkData(7, 0, 2)
|
||||||
|
tdSql.checkData(7, 1, '"femail"')
|
||||||
|
|
||||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
|
||||||
# tdSql.checkRows(8)
|
|
||||||
# tdSql.checkData(0, 0, 2)
|
|
||||||
# tdSql.checkData(0, 1, None)
|
|
||||||
# tdSql.checkData(2, 0, 1)
|
|
||||||
# tdSql.checkData(2, 1, "false")
|
|
||||||
# tdSql.checkData(5, 0, 1)
|
|
||||||
# tdSql.checkData(5, 1, "11.000000000")
|
|
||||||
# tdSql.checkData(7, 0, 2)
|
|
||||||
# tdSql.checkData(7, 1, '"femail"')
|
|
||||||
#
|
|
||||||
# test stddev with group by json tag
|
# test stddev with group by json tag
|
||||||
# tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||||
# tdSql.checkRows(8)
|
tdSql.checkRows(8)
|
||||||
# tdSql.checkData(0, 0, 10)
|
tdSql.checkData(0, 1, None)
|
||||||
# tdSql.checkData(0, 1, None)
|
tdSql.checkData(4, 0, 0)
|
||||||
# tdSql.checkData(4, 0, 0)
|
tdSql.checkData(4, 1, "5.000000000")
|
||||||
# tdSql.checkData(4, 1, "5.000000000")
|
tdSql.checkData(7, 0, 11)
|
||||||
# tdSql.checkData(7, 0, 11)
|
tdSql.checkData(7, 1, '"femail"')
|
||||||
# tdSql.checkData(7, 1, '"femail"')
|
|
||||||
#
|
res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
||||||
# res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
cname_list = []
|
||||||
# cname_list = []
|
cname_list.append("stddev(dataint)")
|
||||||
# cname_list.append("stddev(dataint)")
|
cname_list.append("jsons1.jtag->'tag1'")
|
||||||
# cname_list.append("jsons1.jtag->'tag1'")
|
tdSql.checkColNameList(res, cname_list)
|
||||||
# tdSql.checkColNameList(res, cname_list)
|
|
||||||
|
|
||||||
# test top/bottom with group by json tag
|
# test top/bottom with group by json tag
|
||||||
# tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||||
# tdSql.checkRows(11)
|
tdSql.checkRows(11)
|
||||||
# tdSql.checkData(0, 1, None)
|
tdSql.checkData(0, 1, None)
|
||||||
# tdSql.checkData(2, 0, 4)
|
tdSql.checkData(2, 0, 4)
|
||||||
# tdSql.checkData(3, 0, 3)
|
tdSql.checkData(3, 0, 3)
|
||||||
# tdSql.checkData(3, 1, "false")
|
tdSql.checkData(3, 1, "false")
|
||||||
# tdSql.checkData(8, 0, 2)
|
tdSql.checkData(8, 0, 2)
|
||||||
# tdSql.checkData(10, 1, '"femail"')
|
tdSql.checkData(10, 1, '"femail"')
|
||||||
|
|
||||||
# test having
|
# test having
|
||||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
||||||
|
@ -492,6 +515,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(5, 0, '{"tag1":false,"tag2":"beijing"}')
|
tdSql.checkData(5, 0, '{"tag1":false,"tag2":"beijing"}')
|
||||||
|
|
||||||
tdSql.error("select jtag->'tag1' from (select jtag->'tag1', dataint from jsons1)")
|
tdSql.error("select jtag->'tag1' from (select jtag->'tag1', dataint from jsons1)")
|
||||||
|
tdSql.error("select t->'tag1' from (select jtag->'tag1' as t, dataint from jsons1)")
|
||||||
# tdSql.query("select ts,jtag->'tag1' from (select jtag->'tag1',tbname,ts from jsons1 order by ts)")
|
# tdSql.query("select ts,jtag->'tag1' from (select jtag->'tag1',tbname,ts from jsons1 order by ts)")
|
||||||
# tdSql.checkRows(11)
|
# tdSql.checkRows(11)
|
||||||
# tdSql.checkData(1, 1, "jsons1_1")
|
# tdSql.checkData(1, 1, "jsons1_1")
|
||||||
|
@ -519,9 +543,10 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 0, 10)
|
tdSql.checkData(0, 0, 10)
|
||||||
tdSql.query("select avg(dataint) from jsons1 where jtag is not null")
|
tdSql.query("select avg(dataint) from jsons1 where jtag is not null")
|
||||||
tdSql.checkData(0, 0, 5.3)
|
tdSql.checkData(0, 0, 5.3)
|
||||||
# tdSql.query("select twa(dataint) from jsons1 where jtag is not null")
|
tdSql.query("select twa(dataint) from jsons1 where jtag is not null")
|
||||||
# tdSql.checkData(0, 0, 36)
|
tdSql.checkData(0, 0, 28.386363636363637)
|
||||||
# tdSql.error("select irate(dataint) from jsons1 where jtag is not null")
|
tdSql.query("select irate(dataint) from jsons1 where jtag is not null")
|
||||||
|
|
||||||
tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null")
|
tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null")
|
||||||
tdSql.checkData(0, 0, 45)
|
tdSql.checkData(0, 0, 45)
|
||||||
tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")
|
tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")
|
||||||
|
@ -549,9 +574,9 @@ class TDTestCase:
|
||||||
|
|
||||||
#test calculation function:diff/derivative/spread/ceil/floor/round/
|
#test calculation function:diff/derivative/spread/ceil/floor/round/
|
||||||
tdSql.query("select diff(dataint) from jsons1 where jtag->'tag1'>1")
|
tdSql.query("select diff(dataint) from jsons1 where jtag->'tag1'>1")
|
||||||
# tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
# tdSql.checkData(0, 0, -1)
|
tdSql.checkData(0, 0, -1)
|
||||||
# tdSql.checkData(1, 0, 10)
|
tdSql.checkData(1, 0, 10)
|
||||||
tdSql.query("select derivative(dataint, 10m, 0) from jsons1 where jtag->'tag1'>1")
|
tdSql.query("select derivative(dataint, 10m, 0) from jsons1 where jtag->'tag1'>1")
|
||||||
tdSql.checkData(0, 0, -2)
|
tdSql.checkData(0, 0, -2)
|
||||||
tdSql.query("select spread(dataint) from jsons1 where jtag->'tag1'>1")
|
tdSql.query("select spread(dataint) from jsons1 where jtag->'tag1'>1")
|
||||||
|
@ -608,14 +633,14 @@ class TDTestCase:
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
# function not ready
|
# function not ready
|
||||||
# tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;")
|
||||||
# tdSql.checkRows(3)
|
tdSql.checkRows(1)
|
||||||
# tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||||
# tdSql.checkRows(3)
|
tdSql.checkRows(3)
|
||||||
# tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||||
# tdSql.checkRows(3)
|
tdSql.checkRows(1)
|
||||||
# tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||||
# tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
#str function
|
#str function
|
||||||
tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;")
|
||||||
|
@ -659,13 +684,26 @@ class TDTestCase:
|
||||||
tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;")
|
tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
#
|
# to_json()
|
||||||
# #test TD-12077
|
tdSql.query("select to_json('{\"abc\":123}') from jsons1_1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
# tdSql.checkData(0, 0, '{"abc":123}')
|
||||||
|
# tdSql.checkData(1, 0, '{"abc":123}')
|
||||||
|
tdSql.query("select to_json('null') from jsons1_1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 'null')
|
||||||
|
tdSql.checkData(1, 0, 'null')
|
||||||
|
tdSql.query("select to_json('{\"key\"}') from jsons1_1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, 'null')
|
||||||
|
tdSql.checkData(1, 0, 'null')
|
||||||
|
|
||||||
|
#test TD-12077
|
||||||
tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
||||||
tdSql.query("select jtag->'tag3' from jsons1_16")
|
tdSql.query("select jtag->'tag3' from jsons1_16")
|
||||||
tdSql.checkData(0, 0, '-2.111000000')
|
tdSql.checkData(0, 0, '-2.111000000')
|
||||||
|
|
||||||
# # test TD-12452
|
# test TD-12452
|
||||||
tdSql.execute("ALTER TABLE jsons1_1 SET TAG jtag=NULL")
|
tdSql.execute("ALTER TABLE jsons1_1 SET TAG jtag=NULL")
|
||||||
tdSql.query("select jtag from jsons1_1")
|
tdSql.query("select jtag from jsons1_1")
|
||||||
tdSql.checkData(0, 0, None)
|
tdSql.checkData(0, 0, None)
|
||||||
|
|
|
@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py
|
||||||
python3 ./test.py -f 2-query/timetruncate.py
|
python3 ./test.py -f 2-query/timetruncate.py
|
||||||
python3 ./test.py -f 2-query/diff.py
|
python3 ./test.py -f 2-query/diff.py
|
||||||
python3 ./test.py -f 2-query/Timediff.py
|
python3 ./test.py -f 2-query/Timediff.py
|
||||||
#python3 ./test.py -f 2-query/json_tag.py
|
python3 ./test.py -f 2-query/json_tag.py
|
||||||
|
|
||||||
python3 ./test.py -f 2-query/top.py
|
python3 ./test.py -f 2-query/top.py
|
||||||
python3 ./test.py -f 2-query/bottom.py
|
python3 ./test.py -f 2-query/bottom.py
|
||||||
|
|
|
@ -482,6 +482,7 @@ int32_t shellReadCommand(char *command) {
|
||||||
#endif
|
#endif
|
||||||
break;
|
break;
|
||||||
case 4: // EOF or Ctrl+D
|
case 4: // EOF or Ctrl+D
|
||||||
|
taosResetTerminalMode();
|
||||||
printf("\n");
|
printf("\n");
|
||||||
return -1;
|
return -1;
|
||||||
case 5: // ctrl E
|
case 5: // ctrl E
|
||||||
|
|
Loading…
Reference in New Issue