Merge branch '3.0' into fix/TD-15257
This commit is contained in:
commit
f44e5310da
|
@ -222,6 +222,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
|
|||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||
|
||||
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||
|
||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||
|
|
|
@ -78,6 +78,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
|||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf);
|
||||
|
||||
// STRUCT =================
|
||||
struct STColumn {
|
||||
|
|
|
@ -388,6 +388,10 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
|
||||
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
|
||||
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
|
||||
//json
|
||||
#define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C)
|
||||
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D)
|
||||
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E)
|
||||
|
||||
// grant
|
||||
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
|
||||
|
|
|
@ -716,7 +716,12 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
|||
|
||||
void* left1 = colDataGetData(pColInfoData, left);
|
||||
void* right1 = colDataGetData(pColInfoData, right);
|
||||
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
if (tTagIsJson(left1) || tTagIsJson(right1)) {
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
||||
|
||||
int ret = fn(left1, right1);
|
||||
|
@ -919,6 +924,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
}
|
||||
|
||||
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
|
||||
if(terrno) return terrno;
|
||||
|
||||
int64_t p1 = taosGetTimestampUs();
|
||||
|
||||
|
@ -1431,9 +1437,39 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end){
|
||||
int32_t dataOffset = -1;
|
||||
int32_t dataLen = 0;
|
||||
int32_t beigin = start;
|
||||
while(beigin < end){
|
||||
int32_t offset = pColInfoData->varmeta.offset[beigin];
|
||||
if(offset == -1) {
|
||||
beigin++;
|
||||
continue;
|
||||
}
|
||||
if(start != 0) {
|
||||
pColInfoData->varmeta.offset[beigin] = dataLen;
|
||||
}
|
||||
char *data = pColInfoData->pData + offset;
|
||||
if(dataOffset == -1) dataOffset = offset; // mark the begin of data
|
||||
int32_t type = pColInfoData->info.type;
|
||||
if (type == TSDB_DATA_TYPE_JSON) {
|
||||
dataLen += getJsonValueLen(data);
|
||||
} else {
|
||||
dataLen += varDataTLen(data);
|
||||
}
|
||||
beigin++;
|
||||
}
|
||||
if(dataOffset > 0){
|
||||
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
|
||||
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
|
||||
}
|
||||
return dataLen;
|
||||
}
|
||||
|
||||
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
|
||||
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
|
||||
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
|
||||
} else {
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
|
@ -1461,6 +1497,33 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
|
||||
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
|
||||
if (n == 0) {
|
||||
blockDataCleanup(pBlock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pBlock->info.rows <= n) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
|
||||
}
|
||||
|
||||
pBlock->info.rows = n;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||
int64_t tbUid = pBlock->info.uid;
|
||||
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
|
|
@ -155,7 +155,8 @@ void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uin
|
|||
void taosVariantDestroy(SVariant *pVar) {
|
||||
if (pVar == NULL) return;
|
||||
|
||||
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR
|
||||
|| pVar->nType == TSDB_DATA_TYPE_JSON) {
|
||||
taosMemoryFreeClear(pVar->pz);
|
||||
pVar->nLen = 0;
|
||||
}
|
||||
|
@ -184,7 +185,8 @@ void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) {
|
|||
if (pSrc == NULL || pDst == NULL) return;
|
||||
|
||||
pDst->nType = pSrc->nType;
|
||||
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR
|
||||
|| pSrc->nType == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
|
||||
char *p = taosMemoryRealloc(pDst->pz, len);
|
||||
assert(p);
|
||||
|
@ -976,6 +978,7 @@ char *taosVariantGet(SVariant *pVar, int32_t type) {
|
|||
case TSDB_DATA_TYPE_FLOAT:
|
||||
return (char *)&pVar->d;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
return (char *)pVar->pz;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
return (char *)pVar->ucs4;
|
||||
|
|
|
@ -297,7 +297,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
SArray* pTagVals = NULL;
|
||||
STag* pTag = (STag*)pCfg->pTags;
|
||||
|
||||
if (pCfg->pTags && pTag->flags & TD_TAG_JSON) {
|
||||
if (pCfg->pTags && tTagIsJson(pTag)) {
|
||||
char* pJson = parseTagDatatoJson(pTag);
|
||||
if (pJson) {
|
||||
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
||||
|
|
|
@ -63,6 +63,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||
}
|
||||
|
||||
rowSize += (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
|
@ -112,7 +113,9 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
|
|||
p->groupId = *(uint64_t*)key;
|
||||
p->pos = *(SResultRowPosition*)pData;
|
||||
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_groupRes, groupId:%"PRIu64",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
|
||||
#endif
|
||||
taosArrayPush(pGroupResInfo->pRows, &p);
|
||||
}
|
||||
|
||||
|
@ -271,6 +274,7 @@ static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
|
|||
SNode* pNew = NULL;
|
||||
int32_t code = scalarCalculateConstants(pTagCondTmp, &pNew);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
terrno = code;
|
||||
nodesDestroyNode(pTagCondTmp);
|
||||
return false;
|
||||
}
|
||||
|
@ -323,11 +327,18 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
if(pTagCond){
|
||||
int32_t i = 0;
|
||||
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
||||
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
||||
bool isOk = isTableOk(info, pTagCond, metaHandle);
|
||||
if(terrno) return terrno;
|
||||
if(!isOk){
|
||||
taosArrayRemove(pListInfo->pTableList, i);
|
||||
continue;
|
||||
|
@ -586,13 +597,16 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0 ||
|
||||
strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
|
||||
pValCtx[num++] = &pCtx[i];
|
||||
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
||||
p = &pCtx[i];
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_setSelect num:%d", num);
|
||||
#endif
|
||||
if (p != NULL) {
|
||||
p->subsidiaries.pCtx = pValCtx;
|
||||
p->subsidiaries.num = num;
|
||||
|
|
|
@ -274,6 +274,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// 1. close current opened time window
|
||||
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_1");
|
||||
#endif
|
||||
SResultRowPosition pos = pResultRowInfo->cur;
|
||||
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||
releaseBufPage(pResultBuf, pPage);
|
||||
|
@ -281,6 +284,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// allocate a new buffer page
|
||||
if (pResult == NULL) {
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_2");
|
||||
#endif
|
||||
ASSERT(pSup->resultRowSize > 0);
|
||||
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
|
||||
|
||||
|
@ -538,7 +544,9 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
|
|||
if (pCtx[k].fpSet.process == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_process");
|
||||
#endif
|
||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
|
@ -1413,7 +1421,9 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
|
|||
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
||||
return;
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_setbuf, groupId:%"PRIu64, groupId);
|
||||
#endif
|
||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||
|
||||
// record the current active group id
|
||||
|
@ -1489,11 +1499,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
int32_t numOfExprs) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t start = pGroupResInfo->index;
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("\npage_copytoblock rows:%d", numOfRows);
|
||||
#endif
|
||||
for (int32_t i = start; i < numOfRows; i += 1) {
|
||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
|
||||
#endif
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||
|
||||
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
||||
|
@ -1525,6 +1539,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
|
||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
|
||||
if (pCtx[j].fpSet.finalize) {
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("\npage_finalize %d", numOfExprs);
|
||||
#endif
|
||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
|
@ -1553,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
|
||||
releaseBufPage(pBuf, page);
|
||||
pBlock->info.rows += pRow->numOfRows;
|
||||
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||
break;
|
||||
}
|
||||
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
|
||||
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||
|
@ -2373,8 +2390,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
}
|
||||
|
||||
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
|
||||
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
|
||||
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
|
||||
if (pInfo->pSources == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -3165,8 +3181,9 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
|||
}
|
||||
|
||||
// check for the limitation in each group
|
||||
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
|
||||
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
||||
if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
|
||||
int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
||||
blockDataKeepFirstNRows(pRes, keepRows);
|
||||
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -3480,11 +3497,12 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
|
|||
}
|
||||
|
||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||
ASSERT(numOfRows != 0);
|
||||
pOperator->resultInfo.capacity = numOfRows;
|
||||
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
||||
|
||||
if (pOperator->resultInfo.threshold == 0) {
|
||||
pOperator->resultInfo.capacity = numOfRows;
|
||||
pOperator->resultInfo.threshold = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3984,12 +4002,12 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
|
|||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
if (p == NULL) {
|
||||
if (taosArrayPush(sortSupport, groupId) != NULL) {
|
||||
if (taosArrayPush(sortSupport, groupId) == NULL) {
|
||||
qError("taos push support array error");
|
||||
taosArrayDestroy(sortSupport);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL) {
|
||||
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
|
||||
qError("taos push group array error");
|
||||
taosArrayDestroy(sortSupport);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
|
@ -4069,6 +4087,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
} else {
|
||||
taosMemoryFree(keyBuf);
|
||||
nodesClearList(groupNew);
|
||||
metaReaderClear(&mr);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -4082,6 +4101,13 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
isNull[index++] = 0;
|
||||
char* data = nodesGetValueFromNode(pValue);
|
||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
||||
if(tTagIsJson(data)){
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||
taosMemoryFree(keyBuf);
|
||||
nodesClearList(groupNew);
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
}
|
||||
int32_t len = getJsonValueLen(data);
|
||||
memcpy(pStart, data, len);
|
||||
pStart += len;
|
||||
|
@ -4141,6 +4167,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
if(code){
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
|
@ -4167,7 +4194,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
.maxTs = INT64_MIN,
|
||||
};
|
||||
if (pHandle) {
|
||||
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
if(code){
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator =
|
||||
|
|
|
@ -141,6 +141,10 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
|||
pkey->isNull = false;
|
||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||
if(tTagIsJson(val)){
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||
return;
|
||||
}
|
||||
int32_t dataLen = getJsonValueLen(val);
|
||||
memcpy(pkey->pData, val, dataLen);
|
||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||
|
@ -227,11 +231,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
int32_t len = 0;
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
int32_t num = 0;
|
||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||
if (!pInfo->isInit) {
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
pInfo->isInit = true;
|
||||
num++;
|
||||
continue;
|
||||
|
@ -247,6 +255,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
if (j == 0) {
|
||||
num++;
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -661,7 +672,11 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
doHashPartition(pOperator, pBlock);
|
||||
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
}
|
||||
|
||||
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
||||
|
|
|
@ -2090,6 +2090,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
|||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort;
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -593,7 +593,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
@ -608,7 +611,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
|
|
@ -808,6 +808,12 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
}
|
||||
|
||||
// param1 ~ param3
|
||||
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
for (int32_t i = 1; i < numOfParams; ++i) {
|
||||
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i);
|
||||
if (QUERY_NODE_VALUE != nodeType(pParamNode)) {
|
||||
|
@ -817,12 +823,11 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
SValueNode* pValue = (SValueNode*)pParamNode;
|
||||
|
||||
pValue->notReserved = true;
|
||||
}
|
||||
|
||||
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
if (i == 3 && pValue->datum.i != 1 && pValue->datum.i != 0) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"HISTOGRAM function normalized parameter should be 0/1");
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_BINARY};
|
||||
|
@ -842,6 +847,12 @@ static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
}
|
||||
|
||||
// param1 ~ param3
|
||||
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
for (int32_t i = 1; i < numOfParams; ++i) {
|
||||
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i);
|
||||
if (QUERY_NODE_VALUE != nodeType(pParamNode)) {
|
||||
|
@ -851,12 +862,11 @@ static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
SValueNode* pValue = (SValueNode*)pParamNode;
|
||||
|
||||
pValue->notReserved = true;
|
||||
}
|
||||
|
||||
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
if (i == 3 && pValue->datum.i != 1 && pValue->datum.i != 0) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"HISTOGRAM function normalized parameter should be 0/1");
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType =
|
||||
|
@ -1606,7 +1616,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BINARY].bytes, .type = TSDB_DATA_TYPE_BINARY};
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes, .type = TSDB_DATA_TYPE_JSON};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2683,7 +2693,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_group_key",
|
||||
.type = FUNCTION_TYPE_GROUP_KEY,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
||||
.translateFunc = translateGroupKey,
|
||||
.getEnvFunc = getGroupKeyFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "thistogram.h"
|
||||
#include "tpercentile.h"
|
||||
#include "query.h"
|
||||
|
||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||
#define MAVG_MAX_POINTS_NUM 1000
|
||||
|
@ -1472,8 +1473,8 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
int32_t pageId = pTuplePos->pageId;
|
||||
int32_t offset = pTuplePos->offset;
|
||||
|
||||
if (pTuplePos->pageId != -1) {
|
||||
int32_t numOfCols = taosArrayGetSize(pCtx->pSrcBlock->pDataBlock);
|
||||
if (pTuplePos->pageId != -1 && pCtx->subsidiaries.num > 0) {
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + offset);
|
||||
|
@ -1484,22 +1485,21 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
int32_t ps = 0;
|
||||
for (int32_t k = 0; k < srcSlotId; ++k) {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||
ps += pSrcCol->info.bytes;
|
||||
}
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
if (nullList[srcSlotId]) {
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
if (nullList[j]) {
|
||||
colDataAppendNULL(pDstCol, rowIndex);
|
||||
} else {
|
||||
colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
|
||||
colDataAppend(pDstCol, rowIndex, pStart, false);
|
||||
}
|
||||
pStart += pDstCol->info.bytes;
|
||||
}
|
||||
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3208,7 +3208,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
if (pCtx->subsidiaries.num > 0) {
|
||||
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
#endif
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
|
@ -3229,7 +3231,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
if (pCtx->subsidiaries.num > 0) {
|
||||
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
#endif
|
||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||
topBotResComparFn, NULL, !isTopQuery);
|
||||
}
|
||||
|
@ -3239,7 +3243,11 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = NULL;
|
||||
|
||||
int32_t completeRowSize = pSrcBlock->info.rowSize + (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(bool);
|
||||
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
completeRowSize += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
if (pCtx->curBufPage == -1) {
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
|
@ -3257,19 +3265,22 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
// keep the current row data, extract method
|
||||
int32_t offset = 0;
|
||||
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock));
|
||||
for (int32_t i = 0; i < (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||
if (isNull) {
|
||||
nullList[i] = true;
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num);
|
||||
for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
offset += pCol->info.bytes;
|
||||
continue;
|
||||
}
|
||||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
@ -3287,14 +3298,18 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
offset += pCol->info.bytes;
|
||||
continue;
|
||||
|
@ -3302,7 +3317,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
@ -3316,7 +3331,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
|
||||
int16_t type = pCtx->input.pData[0]->info.type;
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
@ -3333,7 +3348,9 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
} else {
|
||||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
#endif
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
@ -5629,8 +5646,6 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
|
|||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
int32_t bytes = pInputCol->info.bytes;
|
||||
|
||||
int32_t startIndex = pInput->startRowIndex;
|
||||
|
||||
//escape rest of data blocks to avoid first entry to be overwritten.
|
||||
|
@ -5645,7 +5660,11 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, startIndex);
|
||||
memcpy(pInfo->data, data, bytes);
|
||||
if (IS_VAR_DATA_TYPE(pInputCol->info.type)) {
|
||||
memcpy(pInfo->data, data, (pInputCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(data): varDataTLen(data));
|
||||
} else {
|
||||
memcpy(pInfo->data, data, pInputCol->info.bytes);
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
|
||||
_group_key_over:
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "querynodes.h"
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
#define COPY_SCALAR_FIELD(fldname) \
|
||||
do { \
|
||||
|
@ -164,7 +165,15 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
|
|||
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
int32_t len = getJsonValueLen(pSrc->datum.p);
|
||||
pDst->datum.p = taosMemoryCalloc(1, len);
|
||||
if (NULL == pDst->datum.p) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "querynodes.h"
|
||||
#include "taoserror.h"
|
||||
#include "tjson.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
static int32_t nodeToJson(const void* pObj, SJson* pJson);
|
||||
static int32_t jsonToNode(const SJson* pJson, void* pObj);
|
||||
|
@ -2629,7 +2630,18 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
|
|||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
int32_t len = getJsonValueLen(pNode->datum.p);
|
||||
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char));
|
||||
code = taosHexEncode(pNode->datum.p, buf, len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(buf);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
code = tjsonAddStringToObject(pJson, jkValueDatum, buf);
|
||||
taosMemoryFree(buf);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
|
@ -2752,7 +2764,30 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
|
||||
if (NULL == pNode->datum.p) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
char* buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + 1);
|
||||
if (NULL == buf) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
code = tjsonGetStringValue(pJson, jkValueDatum, buf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(buf);
|
||||
break;
|
||||
}
|
||||
code = taosHexDecode(buf, pNode->datum.p, pNode->node.resType.bytes);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(buf);
|
||||
break;
|
||||
}
|
||||
taosMemoryFree(buf);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "thash.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
static SNode* makeNode(ENodeType type, size_t size) {
|
||||
SNode* p = taosMemoryCalloc(1, size);
|
||||
|
@ -1675,6 +1676,10 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
|||
pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
pVal->nLen = getJsonValueLen(pNode->datum.p);
|
||||
pVal->pz = taosMemoryMalloc(pVal->nLen);
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
|
|
|
@ -62,7 +62,6 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
|
|||
int32_t getNumOfTags(const STableMeta* pTableMeta);
|
||||
STableComInfo getTableInfo(const STableMeta* pTableMeta);
|
||||
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf);
|
||||
|
||||
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
||||
|
||||
|
|
|
@ -219,6 +219,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) {
|
|||
}
|
||||
|
||||
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
|
||||
if(pBuf == NULL) return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||
const char* msgFormat1 = "syntax error near \'%s\'";
|
||||
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
|
||||
const char* msgFormat3 = "%s";
|
||||
|
@ -346,7 +347,7 @@ static bool isValidateTag(char* input) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf) {
|
||||
int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf) {
|
||||
int32_t retCode = TSDB_CODE_SUCCESS;
|
||||
cJSON* root = NULL;
|
||||
SHashObj* keyHash = NULL;
|
||||
|
|
|
@ -305,18 +305,21 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
|
||||
char* parseTagDatatoJson(void* p) {
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
SArray* pTagVals = NULL;
|
||||
cJSON* json = NULL;
|
||||
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
int16_t nCols = taosArrayGetSize(pTagVals);
|
||||
if (nCols == 0) {
|
||||
goto end;
|
||||
}
|
||||
char tagJsonKey[256] = {0};
|
||||
json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto end;
|
||||
}
|
||||
for (int j = 0; j < nCols; ++j) {
|
||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
||||
// json key encode by binary
|
||||
|
@ -374,6 +377,10 @@ char* parseTagDatatoJson(void* p) {
|
|||
string = cJSON_PrintUnformatted(json);
|
||||
end:
|
||||
cJSON_Delete(json);
|
||||
taosArrayDestroy(pTagVals);
|
||||
if(string == NULL){
|
||||
string = strdup(TSDB_DATA_NULL_STR_L);
|
||||
}
|
||||
return string;
|
||||
}
|
||||
|
||||
|
|
|
@ -192,6 +192,9 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
|||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
return 18;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
|
||||
return 0;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
@ -215,6 +218,9 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
|||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
return 24;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
|
||||
return 0;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
|
|
@ -551,7 +551,9 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
|||
SScalarParam* pLeft = ¶ms[0];
|
||||
SScalarParam* pRight = paramNum > 1 ? ¶ms[1] : NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
|
||||
code = terrno;
|
||||
|
||||
_return:
|
||||
for (int32_t i = 0; i < paramNum; ++i) {
|
||||
|
@ -693,7 +695,11 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
|||
res->node.resType.scale = output.columnData->info.scale;
|
||||
res->node.resType.precision = output.columnData->info.precision;
|
||||
int32_t type = output.columnData->info.type;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if (type == TSDB_DATA_TYPE_JSON){
|
||||
int32_t len = getJsonValueLen(output.columnData->pData);
|
||||
res->datum.p = taosMemoryCalloc(len, 1);
|
||||
memcpy(res->datum.p, output.columnData->pData, len);
|
||||
} else if (IS_VAR_DATA_TYPE(type)) {
|
||||
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||
} else {
|
||||
|
|
|
@ -1152,42 +1152,30 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
|||
|
||||
char tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
|
||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
}
|
||||
char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[i];
|
||||
SArray* pTagVals = taosArrayInit(8, sizeof(STagVal));
|
||||
STag* pTag = NULL;
|
||||
|
||||
if(type == TSDB_DATA_TYPE_NCHAR){
|
||||
if (varDataTLen(input) > TSDB_MAX_JSON_TAG_LEN){
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
}
|
||||
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), tmp);
|
||||
if (len < 0) {
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
}
|
||||
tmp[len] = 0;
|
||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||
tTagNew(pTagVals, 1, true, &pTag);
|
||||
}else{
|
||||
char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[i];
|
||||
if (varDataLen(input) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
taosArrayDestroy(pTagVals);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
memcpy(tmp, varDataVal(input), varDataLen(input));
|
||||
tmp[varDataLen(input)] = 0;
|
||||
if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL)){
|
||||
tTagNew(pTagVals, 1, true, &pTag);
|
||||
}
|
||||
}
|
||||
|
||||
if(!tjsonValidateJson(tmp)){
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
colDataAppend(pOutput->columnData, i, input, false);
|
||||
colDataAppend(pOutput->columnData, i, (const char*)pTag, false);
|
||||
tTagFree(pTag);
|
||||
taosArrayDestroy(pTagVals);
|
||||
}
|
||||
|
||||
pOutput->numOfRows = pInput->numOfRows;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -160,6 +160,9 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){
|
|||
return 0;
|
||||
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
||||
convertNcharToDouble(data+CHAR_BYTES, &out);
|
||||
} else if(tTagIsJson(data)){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return 0;
|
||||
} else {
|
||||
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
||||
}
|
||||
|
@ -416,6 +419,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
|||
else if(*data == TSDB_DATA_TYPE_NCHAR) {
|
||||
data += CHAR_BYTES;
|
||||
convertType = TSDB_DATA_TYPE_NCHAR;
|
||||
} else if(tTagIsJson(data)){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return terrno;
|
||||
} else {
|
||||
convertNumberToNumber(data+CHAR_BYTES, colDataGetNumData(pOut->columnData, i), *data, outType);
|
||||
continue;
|
||||
|
@ -461,6 +467,9 @@ double getVectorDoubleValue_JSON(void *src, int32_t index){
|
|||
return out;
|
||||
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
|
||||
convertNcharToDouble(data+CHAR_BYTES, &out);
|
||||
} else if(tTagIsJson(data)){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return 0;
|
||||
} else{
|
||||
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
|
||||
}
|
||||
|
@ -493,10 +502,18 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
|
|||
}
|
||||
|
||||
if(typeLeft == TSDB_DATA_TYPE_JSON){
|
||||
if(tTagIsJson(*pLeftData)){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return false;
|
||||
}
|
||||
typeLeft = **pLeftData;
|
||||
(*pLeftData) ++;
|
||||
}
|
||||
if(typeRight == TSDB_DATA_TYPE_JSON){
|
||||
if(tTagIsJson(*pLeftData)){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
return false;
|
||||
}
|
||||
typeRight = **pRightData;
|
||||
(*pRightData) ++;
|
||||
}
|
||||
|
@ -1576,7 +1593,11 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
|||
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
||||
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
|
||||
__compar_fn_t fp = filterGetCompFunc(GET_PARAM_TYPE(pLeft), optr);
|
||||
if(terrno != TSDB_CODE_SUCCESS){
|
||||
return;
|
||||
}
|
||||
|
||||
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
|
||||
|
||||
|
@ -1709,6 +1730,7 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
|||
STagVal getJsonValue(char *json, char *key, bool *isExist) {
|
||||
STagVal val = {.pKey = key};
|
||||
if (tTagIsJson((const STag *)json) == false){
|
||||
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
|
||||
if(isExist){
|
||||
*isExist = false;
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ int32_t taosHexEncode(const char *src, char *dst, int32_t len) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < len; ++i) {
|
||||
sprintf(dst + i * 2, "%02x", src[i] & 0xff);
|
||||
sprintf(dst + i * 2, "%02x", src[i]);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -213,10 +213,10 @@ int32_t taosHexDecode(const char *src, char *dst, int32_t len) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
uint16_t hn, ln, out;
|
||||
uint8_t hn, ln, out;
|
||||
for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) {
|
||||
hn = src[i] > '9' ? src[i] - 'A' + 10 : src[i] - '0';
|
||||
ln = src[i + 1] > '9' ? src[i + 1] - 'A' + 10 : src[i + 1] - '0';
|
||||
hn = src[i] > '9' ? src[i] - 'a' + 10 : src[i] - '0';
|
||||
ln = src[i + 1] > '9' ? src[i + 1] - 'a' + 10 : src[i + 1] - '0';
|
||||
|
||||
out = (hn << 4) | ln;
|
||||
memcpy(dst + j, &out, 1);
|
||||
|
|
|
@ -394,6 +394,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in in/notin operator")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by")
|
||||
|
||||
// grant
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
|
||||
|
|
|
@ -193,7 +193,9 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
|
||||
char* pDataBuf = pg->pData;
|
||||
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
|
||||
#endif
|
||||
pg->length = size; // on disk size
|
||||
return pDataBuf;
|
||||
}
|
||||
|
@ -440,6 +442,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
|
|||
}
|
||||
|
||||
((void**)pi->pData)[0] = pi;
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%"PRId64, pi->pData, pi->pageId, pi->offset);
|
||||
#endif
|
||||
return (void*)(GET_DATA_PAYLOAD(pi));
|
||||
}
|
||||
|
||||
|
@ -462,7 +467,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
|
||||
lruListMoveToFront(pBuf->lruList, (*pi));
|
||||
(*pi)->used = true;
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_getBufPage1 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||
#endif
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
} else { // not in memory
|
||||
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||
|
@ -494,7 +501,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_getBufPage2 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
|
||||
#endif
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
}
|
||||
}
|
||||
|
@ -506,8 +515,11 @@ void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
|
|||
}
|
||||
|
||||
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
|
||||
#endif
|
||||
assert(pi->pData != NULL && pi->used == true);
|
||||
|
||||
// assert(pi->pData != NULL);
|
||||
pi->used = false;
|
||||
pBuf->statis.releasePages += 1;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,10 @@ class TDTestCase:
|
|||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
# tdSql.prepare()
|
||||
tdSql.execute('drop database if exists db')
|
||||
tdSql.execute('create database db vgroups 1')
|
||||
tdSql.execute('use db')
|
||||
print("============== STEP 1 ===== prepare data & validate json string")
|
||||
tdSql.error("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json, tagint int)")
|
||||
tdSql.error("create table if not exists jsons1(ts timestamp, data json) tags(tagint int)")
|
||||
|
@ -56,6 +59,22 @@ class TDTestCase:
|
|||
tdSql.query("select jtag from jsons1_8")
|
||||
tdSql.checkData(0, 0, '{" ":90,"1tag$":2,"tag1":null}')
|
||||
|
||||
tdSql.query("select ts,jtag from jsons1 order by ts limit 2,3")
|
||||
tdSql.checkData(0, 0, '2020-06-02 09:17:08.000')
|
||||
tdSql.checkData(0, 1, '{"tag1":5,"tag2":"beijing"}')
|
||||
tdSql.checkData(1, 0, '2020-06-02 09:17:48.000')
|
||||
tdSql.checkData(1, 1, '{"tag1":false,"tag2":"beijing"}')
|
||||
tdSql.checkData(2, 0, '2020-06-02 09:18:48.000')
|
||||
tdSql.checkData(2, 1, '{"tag1":null,"tag2":"shanghai","tag3":"hello"}')
|
||||
|
||||
tdSql.query("select ts,jtag->'tag1' from jsons1 order by ts limit 2,3")
|
||||
tdSql.checkData(0, 0, '2020-06-02 09:17:08.000')
|
||||
tdSql.checkData(0, 1, '5.000000000')
|
||||
tdSql.checkData(1, 0, '2020-06-02 09:17:48.000')
|
||||
tdSql.checkData(1, 1, 'false')
|
||||
tdSql.checkData(2, 0, '2020-06-02 09:18:48.000')
|
||||
tdSql.checkData(2, 1, 'null')
|
||||
|
||||
# test empty json string, save as jtag is NULL
|
||||
tdSql.execute("insert into jsons1_9 using jsons1 tags('\t') values (1591062328000, 24, NULL, '你就会', '2sdw')")
|
||||
tdSql.execute("CREATE TABLE if not exists jsons1_10 using jsons1 tags('')")
|
||||
|
@ -218,9 +237,19 @@ class TDTestCase:
|
|||
|
||||
# test where with json tag
|
||||
tdSql.query("select * from jsons1_1 where jtag is not null")
|
||||
# tdSql.query("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
||||
tdSql.error("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
||||
tdSql.error("select * from jsons1 where jtag->'tag1'={}")
|
||||
|
||||
# test json error
|
||||
tdSql.error("select jtag + 1 from jsons1")
|
||||
tdSql.error("select jtag > 1 from jsons1")
|
||||
tdSql.error("select jtag like \"1\" from jsons1")
|
||||
tdSql.error("select jtag in (\"1\") from jsons1")
|
||||
tdSql.error("select jtag from jsons1 where jtag > 1")
|
||||
tdSql.error("select jtag from jsons1 where jtag like 'fsss'")
|
||||
tdSql.error("select jtag from jsons1 where jtag in (1)")
|
||||
|
||||
|
||||
# where json value is string
|
||||
tdSql.query("select * from jsons1 where jtag->'tag2'='beijing'")
|
||||
tdSql.checkRows(2)
|
||||
|
@ -369,7 +398,7 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
|
||||
# test where condition in no support in
|
||||
# tdSql.error("select * from jsons1 where jtag->'tag1' in ('beijing')")
|
||||
tdSql.error("select * from jsons1 where jtag->'tag1' in ('beijing')")
|
||||
|
||||
# test where condition match/nmath
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1' match 'ma'")
|
||||
|
@ -387,8 +416,8 @@ class TDTestCase:
|
|||
tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
||||
tdSql.query("select distinct jtag->'tag1' from jsons1")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.query("select distinct jtag from jsons1")
|
||||
tdSql.checkRows(9)
|
||||
# tdSql.query("select distinct jtag from jsons1")
|
||||
# tdSql.checkRows(9)
|
||||
|
||||
#test dumplicate key with normal colomn
|
||||
tdSql.execute("INSERT INTO jsons1_15 using jsons1 tags('{\"tbname\":\"tt\",\"databool\":true,\"datastr\":\"是是是\"}') values(1591060828000, 4, false, 'jjsf', \"你就会\")")
|
||||
|
@ -424,62 +453,56 @@ class TDTestCase:
|
|||
tdSql.checkData(7, 1, "false")
|
||||
|
||||
|
||||
# tdSql.error("select count(*) from jsons1 group by jtag")
|
||||
# tdSql.error("select count(*) from jsons1 partition by jtag")
|
||||
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
||||
tdSql.error("select count(*) from jsons1 group by jtag")
|
||||
tdSql.error("select count(*) from jsons1 partition by jtag")
|
||||
tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
|
||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
|
||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
||||
# tdSql.checkRows(8)
|
||||
# tdSql.checkData(0, 0, 2)
|
||||
# tdSql.checkData(0, 1, '"femail"')
|
||||
# tdSql.checkData(1, 0, 2)
|
||||
# tdSql.checkData(1, 1, '"收到货"')
|
||||
# tdSql.checkData(2, 0, 1)
|
||||
# tdSql.checkData(2, 1, "11.000000000")
|
||||
# tdSql.checkData(5, 0, 1)
|
||||
# tdSql.checkData(5, 1, "false")
|
||||
# tdSql.checkData(6, 0, 1)
|
||||
# tdSql.checkData(6, 1, "null")
|
||||
# tdSql.checkData(7, 0, 2)
|
||||
# tdSql.checkData(7, 1, None)
|
||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(0, 0, 2)
|
||||
tdSql.checkData(0, 1, '"femail"')
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(1, 1, '"收到货"')
|
||||
tdSql.checkData(2, 0, 1)
|
||||
tdSql.checkData(2, 1, "11.000000000")
|
||||
tdSql.checkData(5, 0, 1)
|
||||
tdSql.checkData(5, 1, "false")
|
||||
|
||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(2, 0, 1)
|
||||
tdSql.checkData(2, 1, "false")
|
||||
tdSql.checkData(5, 0, 1)
|
||||
tdSql.checkData(5, 1, "11.000000000")
|
||||
tdSql.checkData(7, 0, 2)
|
||||
tdSql.checkData(7, 1, '"femail"')
|
||||
|
||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
||||
# tdSql.checkRows(8)
|
||||
# tdSql.checkData(0, 0, 2)
|
||||
# tdSql.checkData(0, 1, None)
|
||||
# tdSql.checkData(2, 0, 1)
|
||||
# tdSql.checkData(2, 1, "false")
|
||||
# tdSql.checkData(5, 0, 1)
|
||||
# tdSql.checkData(5, 1, "11.000000000")
|
||||
# tdSql.checkData(7, 0, 2)
|
||||
# tdSql.checkData(7, 1, '"femail"')
|
||||
#
|
||||
# test stddev with group by json tag
|
||||
# tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||
# tdSql.checkRows(8)
|
||||
# tdSql.checkData(0, 0, 10)
|
||||
# tdSql.checkData(0, 1, None)
|
||||
# tdSql.checkData(4, 0, 0)
|
||||
# tdSql.checkData(4, 1, "5.000000000")
|
||||
# tdSql.checkData(7, 0, 11)
|
||||
# tdSql.checkData(7, 1, '"femail"')
|
||||
#
|
||||
# res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
||||
# cname_list = []
|
||||
# cname_list.append("stddev(dataint)")
|
||||
# cname_list.append("jsons1.jtag->'tag1'")
|
||||
# tdSql.checkColNameList(res, cname_list)
|
||||
tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(4, 0, 0)
|
||||
tdSql.checkData(4, 1, "5.000000000")
|
||||
tdSql.checkData(7, 0, 11)
|
||||
tdSql.checkData(7, 1, '"femail"')
|
||||
|
||||
res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
||||
cname_list = []
|
||||
cname_list.append("stddev(dataint)")
|
||||
cname_list.append("jsons1.jtag->'tag1'")
|
||||
tdSql.checkColNameList(res, cname_list)
|
||||
|
||||
# test top/bottom with group by json tag
|
||||
# tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||
# tdSql.checkRows(11)
|
||||
# tdSql.checkData(0, 1, None)
|
||||
# tdSql.checkData(2, 0, 4)
|
||||
# tdSql.checkData(3, 0, 3)
|
||||
# tdSql.checkData(3, 1, "false")
|
||||
# tdSql.checkData(8, 0, 2)
|
||||
# tdSql.checkData(10, 1, '"femail"')
|
||||
tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||
tdSql.checkRows(11)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(2, 0, 4)
|
||||
tdSql.checkData(3, 0, 3)
|
||||
tdSql.checkData(3, 1, "false")
|
||||
tdSql.checkData(8, 0, 2)
|
||||
tdSql.checkData(10, 1, '"femail"')
|
||||
|
||||
# test having
|
||||
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
||||
|
@ -492,6 +515,7 @@ class TDTestCase:
|
|||
tdSql.checkData(5, 0, '{"tag1":false,"tag2":"beijing"}')
|
||||
|
||||
tdSql.error("select jtag->'tag1' from (select jtag->'tag1', dataint from jsons1)")
|
||||
tdSql.error("select t->'tag1' from (select jtag->'tag1' as t, dataint from jsons1)")
|
||||
# tdSql.query("select ts,jtag->'tag1' from (select jtag->'tag1',tbname,ts from jsons1 order by ts)")
|
||||
# tdSql.checkRows(11)
|
||||
# tdSql.checkData(1, 1, "jsons1_1")
|
||||
|
@ -519,9 +543,10 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 0, 10)
|
||||
tdSql.query("select avg(dataint) from jsons1 where jtag is not null")
|
||||
tdSql.checkData(0, 0, 5.3)
|
||||
# tdSql.query("select twa(dataint) from jsons1 where jtag is not null")
|
||||
# tdSql.checkData(0, 0, 36)
|
||||
# tdSql.error("select irate(dataint) from jsons1 where jtag is not null")
|
||||
tdSql.query("select twa(dataint) from jsons1 where jtag is not null")
|
||||
tdSql.checkData(0, 0, 28.386363636363637)
|
||||
tdSql.query("select irate(dataint) from jsons1 where jtag is not null")
|
||||
|
||||
tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null")
|
||||
tdSql.checkData(0, 0, 45)
|
||||
tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")
|
||||
|
@ -549,9 +574,9 @@ class TDTestCase:
|
|||
|
||||
#test calculation function:diff/derivative/spread/ceil/floor/round/
|
||||
tdSql.query("select diff(dataint) from jsons1 where jtag->'tag1'>1")
|
||||
# tdSql.checkRows(2)
|
||||
# tdSql.checkData(0, 0, -1)
|
||||
# tdSql.checkData(1, 0, 10)
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, -1)
|
||||
tdSql.checkData(1, 0, 10)
|
||||
tdSql.query("select derivative(dataint, 10m, 0) from jsons1 where jtag->'tag1'>1")
|
||||
tdSql.checkData(0, 0, -2)
|
||||
tdSql.query("select spread(dataint) from jsons1 where jtag->'tag1'>1")
|
||||
|
@ -608,14 +633,14 @@ class TDTestCase:
|
|||
tdSql.checkRows(1)
|
||||
|
||||
# function not ready
|
||||
# tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;")
|
||||
# tdSql.checkRows(3)
|
||||
# tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
# tdSql.checkRows(3)
|
||||
# tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
# tdSql.checkRows(3)
|
||||
# tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
# tdSql.checkRows(1)
|
||||
tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
#str function
|
||||
tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;")
|
||||
|
@ -659,13 +684,26 @@ class TDTestCase:
|
|||
tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
#
|
||||
# #test TD-12077
|
||||
# to_json()
|
||||
tdSql.query("select to_json('{\"abc\":123}') from jsons1_1")
|
||||
tdSql.checkRows(2)
|
||||
# tdSql.checkData(0, 0, '{"abc":123}')
|
||||
# tdSql.checkData(1, 0, '{"abc":123}')
|
||||
tdSql.query("select to_json('null') from jsons1_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, 'null')
|
||||
tdSql.checkData(1, 0, 'null')
|
||||
tdSql.query("select to_json('{\"key\"}') from jsons1_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, 'null')
|
||||
tdSql.checkData(1, 0, 'null')
|
||||
|
||||
#test TD-12077
|
||||
tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
||||
tdSql.query("select jtag->'tag3' from jsons1_16")
|
||||
tdSql.checkData(0, 0, '-2.111000000')
|
||||
|
||||
# # test TD-12452
|
||||
# test TD-12452
|
||||
tdSql.execute("ALTER TABLE jsons1_1 SET TAG jtag=NULL")
|
||||
tdSql.query("select jtag from jsons1_1")
|
||||
tdSql.checkData(0, 0, None)
|
||||
|
|
|
@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py
|
|||
python3 ./test.py -f 2-query/timetruncate.py
|
||||
python3 ./test.py -f 2-query/diff.py
|
||||
python3 ./test.py -f 2-query/Timediff.py
|
||||
#python3 ./test.py -f 2-query/json_tag.py
|
||||
python3 ./test.py -f 2-query/json_tag.py
|
||||
|
||||
python3 ./test.py -f 2-query/top.py
|
||||
python3 ./test.py -f 2-query/bottom.py
|
||||
|
|
|
@ -482,6 +482,7 @@ int32_t shellReadCommand(char *command) {
|
|||
#endif
|
||||
break;
|
||||
case 4: // EOF or Ctrl+D
|
||||
taosResetTerminalMode();
|
||||
printf("\n");
|
||||
return -1;
|
||||
case 5: // ctrl E
|
||||
|
|
Loading…
Reference in New Issue