Merge remote-tracking branch 'origin/3.0' into feature/node

This commit is contained in:
Shengliang Guan 2022-04-11 19:47:39 +08:00
commit dd155fda16
10 changed files with 183 additions and 109 deletions

View File

@ -238,9 +238,9 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port); uint16_t port);
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr); void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);

View File

@ -13,7 +13,7 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
@ -176,7 +176,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL; SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp); int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
} }
return code; return code;
} }
@ -616,7 +616,7 @@ static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
} }
} }
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) { void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -637,7 +637,7 @@ void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
return NULL; return NULL;
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
return NULL; return NULL;
@ -735,7 +735,42 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
if (type == TSDB_DATA_TYPE_NCHAR) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pResultInfo->convertBuf[i] = p;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes);
varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
p += (len + VARSTR_HEADER_SIZE);
}
}
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -767,37 +802,11 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
} }
// convert UCS4-LE encoded character to native multi-bytes character in current data block. // convert UCS4-LE encoded character to native multi-bytes character in current data block.
for (int32_t i = 0; i < numOfCols; ++i) { if (convertUcs4) {
int32_t type = pResultInfo->fields[i].type; code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
int32_t bytes = pResultInfo->fields[i].bytes;
if (type == TSDB_DATA_TYPE_NCHAR) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pResultInfo->convertBuf[i] = p;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
if (pCol->offset[j] != -1) {
pStart = pCol->offset[j] + pCol->pData;
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes);
varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
p += (len + VARSTR_HEADER_SIZE);
}
} }
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; return code;
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}
}
return TSDB_CODE_SUCCESS;
} }
char* getDbOfConnection(STscObj* pObj) { char* getDbOfConnection(STscObj* pObj) {
@ -829,7 +838,7 @@ void resetConnectDB(STscObj* pTscObj) {
taosThreadMutexUnlock(&pTscObj->mutex); taosThreadMutexUnlock(&pTscObj->mutex);
} }
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
@ -842,5 +851,5 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case // TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
} }

View File

@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL; return NULL;
} }
return doFetchRow(pRequest, true); return doFetchRow(pRequest, true, true);
} }
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
@ -404,7 +404,7 @@ int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
return 0; return 0;
} }
doFetchRow(pRequest, false); doFetchRow(pRequest, false, true);
// TODO refactor // TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
@ -426,7 +426,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
return 0; return 0;
} }
doFetchRow(pRequest, false); doFetchRow(pRequest, false, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;

View File

@ -191,7 +191,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
pResInfo->completed = pRetrieve->completed; pResInfo->completed = pRetrieve->completed;
pResInfo->current = 0; pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); // setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
pRetrieve->completed, pRequest->body.showInfo.execId); pRetrieve->completed, pRequest->body.showInfo.execId);
@ -225,7 +225,7 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pResInfo->pData = pFetchRsp->data; pResInfo->pData = pFetchRsp->data;
pResInfo->current = 0; pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); // setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows, tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
pFetchRsp->completed, pRequest->body.showInfo.execId); pFetchRsp->completed, pRequest->body.showInfo.execId);

View File

@ -232,9 +232,11 @@ typedef struct STaskAttr {
} STaskAttr; } STaskAttr;
struct SOperatorInfo; struct SOperatorInfo;
struct SAggSupporter;
struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
@ -753,6 +755,9 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -300,10 +300,6 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
} }
taosArrayPush(pBlock->pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
pBlock->info.hasVarCol = true;
}
} }
return pBlock; return pBlock;
@ -363,7 +359,7 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition)); memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
pResultRowInfo->capacity = (int32_t)newCapacity; pResultRowInfo->capacity = (int32_t)newCapacity;
} }
@ -419,7 +415,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
pData = getBufPage(pResultBuf, getPageId(pi)); pData = getBufPage(pResultBuf, getPageId(pi));
pageId = getPageId(pi); pageId = getPageId(pi);
if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) { if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi); releaseBufPageInfo(pResultBuf, pi);
@ -439,7 +435,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
pResultRow->pageId = pageId; pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num; pResultRow->offset = (int32_t)pData->num;
pData->num += interBufSize + sizeof(SResultRow); pData->num += interBufSize;
return pResultRow; return pResultRow;
} }
@ -507,7 +503,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, POINTER_BYTES); taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
SResultRowCell cell = {.groupId = tableGroupId, .pos = pos}; SResultRowCell cell = {.groupId = tableGroupId, .pos = pos};
taosArrayPush(pSup->pResultRowArrayList, &cell); taosArrayPush(pSup->pResultRowArrayList, &cell);
} else { } else {
@ -4979,6 +4975,21 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, 0, pInfo->pCtx); doAggregateImpl(pOperator, 0, pInfo->pCtx);
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
char *result = NULL;
int32_t length = 0;
SAggSupporter *pSup = &pAggInfo->aggSup;
pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length);
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length);
if(result){
taosMemoryFree(result);
}
}
#endif
} }
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput);
@ -5007,24 +5018,33 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
} }
static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) { void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SAggSupporter* pSup = &pAggInfo->aggSup;
int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = POINTER_BYTES; // estimate the key length size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = taosMemoryCalloc(1, totalSize); *result = taosMemoryCalloc(1, totalSize);
if (*result == NULL) { if (*result == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
return;
} }
*(int32_t*)(*result) = size; *(int32_t*)(*result) = size;
int32_t offset = sizeof(int32_t); int32_t offset = sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos];
void* pPage = getBufPage(pSup->pResultBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
while (pIter) { while (pIter) {
void* key = taosHashGetKey(pIter, &keyLen); void* key = taosHashGetKey(pIter, &keyLen);
SResultRow** p1 = (SResultRow**)pIter; SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*) getBufPage(pSup->pResultBuf, p1->pageId);
pRow = (SResultRow*)((char*)pPage + p1->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
// recalculate the result size // recalculate the result size
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize; int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
@ -5034,7 +5054,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(*result); taosMemoryFree(*result);
*result = NULL; *result = NULL;
return; longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} else { } else {
*result = tmp; *result = tmp;
} }
@ -5048,7 +5068,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
// save value // save value
*(int32_t*)(*result + offset) = pSup->resultRowSize; *(int32_t*)(*result + offset) = pSup->resultRowSize;
offset += sizeof(int32_t); offset += sizeof(int32_t);
memcpy(*result + offset, *p1, pSup->resultRowSize); memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize; offset += pSup->resultRowSize;
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
@ -5060,15 +5080,11 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
return; return;
} }
static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length) {
if (!result || length <= 0) { if (!result || length <= 0) {
return false; return false;
} }
SAggOperatorInfo* pAggInfo = pOperator->info;
SAggSupporter* pSup = &pAggInfo->aggSup;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result); int32_t count = *(int32_t*)(result);
@ -5080,17 +5096,16 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
uint64_t tableGroupId = *(uint64_t*)(result + offset); uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) { if (!resultRow) {
terrno = TSDB_CODE_TSC_INVALID_INPUT; longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return false;
} }
// add a new result set for a new group // add a new result set for a new group
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES); SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
offset += keyLen; offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset); int32_t valueLen = *(int32_t*)(result + offset);
if (valueLen != pSup->resultRowSize) { if (valueLen != pSup->resultRowSize) {
terrno = TSDB_CODE_TSC_INVALID_INPUT; longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return false;
} }
offset += sizeof(int32_t); offset += sizeof(int32_t);
int32_t pageId = resultRow->pageId; int32_t pageId = resultRow->pageId;
@ -5101,13 +5116,13 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
offset += valueLen; offset += valueLen;
initResultRow(resultRow); initResultRow(resultRow);
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
(SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size;
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
} }
if (offset != length) { if (offset != length) {
terrno = TSDB_CODE_TSC_INVALID_INPUT; longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return false;
} }
return true; return true;
} }
@ -5333,6 +5348,21 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
char *result = NULL;
int32_t length = 0;
SAggSupporter *pSup = &pInfo->aggSup;
pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length);
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length);
if(result){
taosMemoryFree(result);
}
}
#endif
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
@ -5752,7 +5782,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
@ -6218,6 +6247,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->getNextFn = doBuildIntervalResult; pOperator->getNextFn = doBuildIntervalResult;
pOperator->getStreamResFn= doStreamIntervalAgg; pOperator->getStreamResFn= doStreamIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo; pOperator->closeFn = destroyIntervalOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -6287,6 +6318,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->getNextFn = doStateWindowAgg; pOperator->getNextFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo; pOperator->closeFn = destroyStateWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -6327,6 +6360,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->getNextFn = doSessionWindowAgg; pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo; pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
@ -6379,6 +6414,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->name = "AllMultiTableTimeIntervalOperator"; pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval; // pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;

View File

@ -166,7 +166,7 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
if (pCtx[i].functionId == -1) { if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
@ -352,6 +352,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->_openFn = operatorDummyOpenFn; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate; pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupOperatorInfo; pOperator->closeFn = destroyGroupOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;

View File

@ -1748,7 +1748,6 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
SFilterField* fi = right; SFilterField* fi = right;
SValueNode* var = (SValueNode *)fi->desc; SValueNode* var = (SValueNode *)fi->desc;
if (var == NULL) { if (var == NULL) {
assert(fi->data != NULL); assert(fi->data != NULL);
continue; continue;
@ -1767,13 +1766,18 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
} }
SDataType *dType = &var->node.resType; SDataType *dType = &var->node.resType;
size_t bytes = 0;
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY) {
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE; size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
fi->data = taosMemoryCalloc(1, len + 1 + VARSTR_HEADER_SIZE); bytes = len + 1 + VARSTR_HEADER_SIZE;
fi->data = taosMemoryCalloc(1, bytes);
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE; size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
fi->data = taosMemoryCalloc(1, (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); bytes = (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
fi->data = taosMemoryCalloc(1, bytes);
} else if (type != TSDB_DATA_TYPE_JSON){ } else if (type != TSDB_DATA_TYPE_JSON){
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE
/* /*
@ -1797,8 +1801,11 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
} else { } else {
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
out.columnData->info.type = type; out.columnData->info.type = type;
if (IS_VAR_DATA_TYPE(type)) {
out.columnData->info.bytes = bytes;
} else {
out.columnData->info.bytes = tDataTypes[type].bytes; out.columnData->info.bytes = tDataTypes[type].bytes;
ASSERT(!IS_VAR_DATA_TYPE(type)); }
// todo refactor the convert // todo refactor the convert
int32_t code = doConvertDataType(var, &out); int32_t code = doConvertDataType(var, &out);
@ -2985,13 +2992,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)) { if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
(*p)[i] = 0; (*p)[i] = 0;
all = false; all = false;
continue; continue;
} }
// match/nmatch for nchar type need convert from ucs4 to mbs
// match/nmatch for nchar type need convert from ucs4 to mbs
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){ if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){
char *newColData = taosMemoryCalloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); char *newColData = taosMemoryCalloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData)); int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData));
@ -3222,19 +3229,13 @@ int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) {
info->unitFlags = taosMemoryMalloc(info->unitNum * sizeof(*info->unitFlags)); info->unitFlags = taosMemoryMalloc(info->unitNum * sizeof(*info->unitFlags));
filterDumpInfoToString(info, "Final", 0); filterDumpInfoToString(info, "Final", 0);
return code; return code;
_return: _return:
qInfo("init from node failed, code:%d", code); qInfo("init from node failed, code:%d", code);
return code; return code;
} }
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (FILTER_EMPTY_RES(info)) { if (FILTER_EMPTY_RES(info)) {
return false; return false;

View File

@ -46,7 +46,6 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1); colInfoDataEnsureCapacity(out->columnData, 1);
int32_t code = vectorConvertImpl(&in, out); int32_t code = vectorConvertImpl(&in, out);
sclFreeParam(&in); sclFreeParam(&in);

View File

@ -177,9 +177,24 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowInd
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v); colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v);
} }
static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIndex) {
int32_t len = 0;
int32_t inputLen = varDataLen(buf);
char* t = taosMemoryCalloc(1,(inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), pOut->columnData->info.bytes, &len);
varDataSetLen(t, len);
colDataAppend(pOut->columnData, rowIndex, t, false);
taosMemoryFree(t);
}
//TODO opt performance, tmp is not needed.
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
int32_t bufSize = pIn->columnData->info.bytes; int32_t bufSize = pIn->columnData->info.bytes;
char *tmp = taosMemoryMalloc(bufSize); char *tmp = taosMemoryMalloc(bufSize + VARSTR_HEADER_SIZE);
bool vton = false;
_bufConverteFunc func = NULL; _bufConverteFunc func = NULL;
if (TSDB_DATA_TYPE_BOOL == outType) { if (TSDB_DATA_TYPE_BOOL == outType) {
@ -190,6 +205,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
func = varToUnsigned; func = varToUnsigned;
} else if (IS_FLOAT_TYPE(outType)) { } else if (IS_FLOAT_TYPE(outType)) {
func = varToFloat; func = varToFloat;
} else if (outType == TSDB_DATA_TYPE_NCHAR) {
func = varToNchar;
vton = true;
} else { } else {
sclError("invalid convert outType:%d", outType); sclError("invalid convert outType:%d", outType);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
@ -197,20 +215,23 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
pOut->numOfRows = pIn->numOfRows; pOut->numOfRows = pIn->numOfRows;
for (int32_t i = 0; i < pIn->numOfRows; ++i) { for (int32_t i = 0; i < pIn->numOfRows; ++i) {
if (colDataIsNull(pIn->columnData, pIn->numOfRows, i, NULL)) { if (colDataIsNull_s(pIn->columnData, i)) {
colDataAppendNULL(pOut->columnData, i); colDataAppendNULL(pOut->columnData, i);
continue; continue;
} }
char* data = colDataGetData(pIn->columnData, i); char* data = colDataGetData(pIn->columnData, i);
if (TSDB_DATA_TYPE_BINARY == inType) { if (vton) {
memcpy(tmp, data, varDataTLen(data));
} else {
if (TSDB_DATA_TYPE_VARCHAR == inType) {
memcpy(tmp, varDataVal(data), varDataLen(data)); memcpy(tmp, varDataVal(data), varDataLen(data));
tmp[varDataLen(data)] = 0; tmp[varDataLen(data)] = 0;
} else { } else {
ASSERT (varDataLen(data) <= bufSize); ASSERT(varDataLen(data) <= bufSize);
int len = taosUcs4ToMbs((TdUcs4*)varDataVal(data), varDataLen(data), tmp); int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp);
if (len < 0){ if (len < 0) {
sclError("castConvert taosUcs4ToMbs error 1"); sclError("castConvert taosUcs4ToMbs error 1");
taosMemoryFreeClear(tmp); taosMemoryFreeClear(tmp);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
@ -218,6 +239,7 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
tmp[len] = 0; tmp[len] = 0;
} }
}
(*func)(tmp, pOut, i); (*func)(tmp, pOut, i);
} }