support udf with subquery
This commit is contained in:
parent
37b1ced568
commit
149a0d547d
|
@ -309,7 +309,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
|||
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
||||
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
||||
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp);
|
||||
|
||||
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
|
||||
|
||||
|
|
|
@ -804,7 +804,15 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
return code;
|
||||
}
|
||||
|
||||
SArray *pUdfInfo = NULL;
|
||||
if (pQueryInfo->pUdfInfo) {
|
||||
pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo);
|
||||
}
|
||||
|
||||
pQueryInfo = pCmd->active;
|
||||
pQueryInfo->pUdfInfo = pUdfInfo;
|
||||
pQueryInfo->udfCopy = true;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1865,6 +1873,7 @@ void genUdfList(SArray* pUdfInfo, tSqlExpr *pNode) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
static int32_t checkForUdf(SSqlObj* pSql, SQueryInfo* pQueryInfo, SArray* pSelection) {
|
||||
if (pQueryInfo->pUdfInfo != NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1889,6 +1898,7 @@ static int32_t checkForUdf(SSqlObj* pSql, SQueryInfo* pQueryInfo, SArray* pSelec
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
static SUdfInfo* isValidUdf(SArray* pUdfInfo, const char* name, int32_t len) {
|
||||
size_t t = taosArrayGetSize(pUdfInfo);
|
||||
|
@ -7119,11 +7129,6 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||
if (validateSelectNodeList(&pSql->cmd, pQueryInfo, pSqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
|
@ -7564,6 +7569,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
SArray* pVgroupList = NULL;
|
||||
SArray* plist = NULL;
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
|
||||
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
||||
|
@ -7636,9 +7642,44 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
size_t funcSize = 0;
|
||||
if (pInfo->funcs) {
|
||||
funcSize = taosArrayGetSize(pInfo->funcs);
|
||||
}
|
||||
|
||||
if (funcSize > 0) {
|
||||
for (size_t i = 0; i < funcSize; ++i) {
|
||||
SStrToken* t = taosArrayGet(pInfo->funcs, i);
|
||||
if (NULL == t) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (t->n >= TSDB_FUNC_NAME_LEN) {
|
||||
code = tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), "too long function name", t->z);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t functionId = isValidFunction(t->z, t->n);
|
||||
if (functionId < 0) {
|
||||
struct SUdfInfo info = {0};
|
||||
info.name = strndup(t->z, t->n);
|
||||
if (pQueryInfo->pUdfInfo == NULL) {
|
||||
pQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
|
||||
}
|
||||
|
||||
int32_t functionId = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) * (-1) - 1;
|
||||
info.functionId = functionId;
|
||||
|
||||
taosArrayPush(pQueryInfo->pUdfInfo, &info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// load the table meta for a given table name list
|
||||
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) {
|
||||
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, tscTableMetaCallBack);
|
||||
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0 || (pQueryInfo->pUdfInfo && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0)) {
|
||||
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList, pQueryInfo->pUdfInfo, tscTableMetaCallBack);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -7763,6 +7804,14 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
|
|||
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
|
||||
tscInitQueryInfo(pSub);
|
||||
|
||||
SArray *pUdfInfo = NULL;
|
||||
if (pQueryInfo->pUdfInfo) {
|
||||
pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo);
|
||||
}
|
||||
|
||||
pSub->pUdfInfo = pUdfInfo;
|
||||
pSub->udfCopy = true;
|
||||
|
||||
int32_t code = validateSqlNode(pSql, p, pSub);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -7911,11 +7960,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
int32_t type = isSTable? TSDB_QUERY_TYPE_STABLE_QUERY:TSDB_QUERY_TYPE_TABLE_QUERY;
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
|
||||
|
||||
code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// parse the group by clause in the first place
|
||||
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
|
|
|
@ -2090,6 +2090,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
|||
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
||||
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
||||
pMultiMeta->numOfVgroup = htonl(pMultiMeta->numOfVgroup);
|
||||
pMultiMeta->numOfUdf = htonl(pMultiMeta->numOfUdf);
|
||||
|
||||
rsp += sizeof(SMultiTableMeta);
|
||||
|
||||
|
@ -2160,6 +2161,37 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
|||
pMsg += size;
|
||||
}
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pParentCmd);
|
||||
if (pMultiMeta->numOfUdf > 0) {
|
||||
assert(pQueryInfo->pUdfInfo != NULL);
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pMultiMeta->numOfUdf; ++i) {
|
||||
SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg;
|
||||
|
||||
for(int32_t j = 0; j < pMultiMeta->numOfUdf; ++j) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j);
|
||||
if (strcmp(pUdfInfo->name, pFunc->name) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pUdfInfo->content) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pUdfInfo->resBytes = htons(pFunc->resBytes);
|
||||
pUdfInfo->resType = pFunc->resType;
|
||||
pUdfInfo->funcType = htonl(pFunc->funcType);
|
||||
pUdfInfo->contLen = htonl(pFunc->len);
|
||||
pUdfInfo->bufSize = htonl(pFunc->bufSize);
|
||||
|
||||
pUdfInfo->content = malloc(pUdfInfo->contLen);
|
||||
memcpy(pUdfInfo->content, pFunc->content, pUdfInfo->contLen);
|
||||
|
||||
pMsg += sizeof(SFunctionInfoMsg) + pUdfInfo->contLen;
|
||||
}
|
||||
}
|
||||
|
||||
pSql->res.code = TSDB_CODE_SUCCESS;
|
||||
pSql->res.numOfTotal = pMultiMeta->numOfTables;
|
||||
tscDebug("0x%"PRIx64" load multi-tableMeta from mnode, numOfTables:%d", pSql->self, pMultiMeta->numOfTables);
|
||||
|
@ -2543,7 +2575,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp) {
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp) {
|
||||
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
||||
if (NULL == pNew) {
|
||||
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
|
||||
|
@ -2556,8 +2588,9 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
|
|||
|
||||
int32_t numOfTable = (int32_t) taosArrayGetSize(pNameList);
|
||||
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pVgroupNameList);
|
||||
int32_t numOfUdf = pUdfList ? taosArrayGetSize(pUdfList) : 0;
|
||||
|
||||
int32_t size = (numOfTable + numOfVgroupList) * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg);
|
||||
int32_t size = (numOfTable + numOfVgroupList) * TSDB_TABLE_FNAME_LEN + TSDB_FUNC_NAME_LEN * numOfUdf + sizeof(SMultiTableInfoMsg);
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, size)) {
|
||||
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
|
||||
tscFreeSqlObj(pNew);
|
||||
|
@ -2567,12 +2600,13 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
|
|||
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
|
||||
pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList));
|
||||
pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList));
|
||||
pInfo->numOfUdfs = htonl(numOfUdf);
|
||||
|
||||
char* start = pInfo->tableNames;
|
||||
int32_t len = 0;
|
||||
for(int32_t i = 0; i < numOfTable; ++i) {
|
||||
char* name = taosArrayGetP(pNameList, i);
|
||||
if (i < numOfTable - 1 || numOfVgroupList > 0) {
|
||||
if (i < numOfTable - 1 || numOfVgroupList > 0 || numOfUdf > 0) {
|
||||
len = sprintf(start, "%s,", name);
|
||||
} else {
|
||||
len = sprintf(start, "%s", name);
|
||||
|
@ -2583,7 +2617,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
|
|||
|
||||
for(int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char* name = taosArrayGetP(pVgroupNameList, i);
|
||||
if (i < numOfVgroupList - 1) {
|
||||
if (i < numOfVgroupList - 1 || numOfUdf > 0) {
|
||||
len = sprintf(start, "%s,", name);
|
||||
} else {
|
||||
len = sprintf(start, "%s", name);
|
||||
|
@ -2592,12 +2626,23 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
|
|||
start += len;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < numOfUdf; ++i) {
|
||||
SUdfInfo * u = taosArrayGet(pUdfList, i);
|
||||
if (i < numOfUdf - 1) {
|
||||
len = sprintf(start, "%s,", u->name);
|
||||
} else {
|
||||
len = sprintf(start, "%s", u->name);
|
||||
}
|
||||
|
||||
start += len;
|
||||
}
|
||||
|
||||
pNew->cmd.payloadLen = (int32_t) ((start - pInfo->tableNames) + sizeof(SMultiTableInfoMsg));
|
||||
pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
||||
|
||||
registerSqlObj(pNew);
|
||||
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, msg size:%d", pSql->self,
|
||||
pNew->self, numOfTable, numOfVgroupList, pNew->cmd.payloadLen);
|
||||
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, udf:%d, msg size:%d", pSql->self,
|
||||
pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen);
|
||||
|
||||
pNew->fp = fp;
|
||||
pNew->param = (void *)pSql->self;
|
||||
|
|
|
@ -981,7 +981,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
registerSqlObj(pSql);
|
||||
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
|
||||
|
||||
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback);
|
||||
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, NULL, loadMultiTableMetaCallback);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1265,7 +1265,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
|
|||
tfree(pUpQueryInfo);
|
||||
}
|
||||
|
||||
if (pCmd->subCmd) {
|
||||
if (pQueryInfo->udfCopy) {
|
||||
pQueryInfo->pUdfInfo = taosArrayDestroy(pQueryInfo->pUdfInfo);
|
||||
} else {
|
||||
pQueryInfo->pUdfInfo = tscDestroyUdfArrayList(pQueryInfo->pUdfInfo);
|
||||
|
@ -3045,6 +3045,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
|
|||
tscAddTableMetaInfo(pQueryInfo, &p1->name, pMeta, p1->vgroupList, p1->tagColList, p1->pVgroupTables);
|
||||
}
|
||||
|
||||
SArray *pUdfInfo = NULL;
|
||||
if (pSrc->pUdfInfo) {
|
||||
pUdfInfo = taosArrayDup(pSrc->pUdfInfo);
|
||||
}
|
||||
|
||||
pQueryInfo->pUdfInfo = pUdfInfo;
|
||||
pQueryInfo->udfCopy = true;
|
||||
|
||||
_error:
|
||||
return code;
|
||||
}
|
||||
|
@ -3345,6 +3353,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
|
||||
if (pQueryInfo->pUdfInfo) {
|
||||
pNewQueryInfo->pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo);
|
||||
pNewQueryInfo->udfCopy = true;
|
||||
}
|
||||
|
||||
pNewQueryInfo->command = pQueryInfo->command;
|
||||
|
|
|
@ -749,6 +749,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t numOfVgroups;
|
||||
int32_t numOfTables;
|
||||
int32_t numOfUdfs;
|
||||
char tableNames[];
|
||||
} SMultiTableInfoMsg;
|
||||
|
||||
|
@ -799,6 +800,7 @@ typedef struct STableMetaMsg {
|
|||
typedef struct SMultiTableMeta {
|
||||
int32_t numOfTables;
|
||||
int32_t numOfVgroup;
|
||||
int32_t numOfUdf;
|
||||
int32_t contLen;
|
||||
char meta[];
|
||||
} SMultiTableMeta;
|
||||
|
|
|
@ -210,6 +210,9 @@
|
|||
#define TK_INTO 191
|
||||
#define TK_VALUES 192
|
||||
|
||||
|
||||
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_COMMENT 301
|
||||
#define TK_ILLEGAL 302
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "mnodeWrite.h"
|
||||
#include "mnodeRead.h"
|
||||
#include "mnodePeer.h"
|
||||
#include "mnodeFunc.h"
|
||||
|
||||
#define ALTER_CTABLE_RETRY_TIMES 3
|
||||
#define CREATE_CTABLE_RETRY_TIMES 10
|
||||
|
@ -2842,11 +2843,30 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t calculateMultipleVgroupMsgLength(SArray* vlist) {
|
||||
int32_t contLen = 0;
|
||||
int32_t numOfTable = taosArrayGetSize(vlist);
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
char *stableName = taosArrayGetP(vlist, i);
|
||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||
if (pTable != NULL && pTable->vgHash != NULL) {
|
||||
contLen += TSDB_TABLE_NAME_LEN + (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
}
|
||||
|
||||
return contLen;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
||||
pInfo->numOfVgroups = htonl(pInfo->numOfVgroups);
|
||||
pInfo->numOfUdfs = htonl(pInfo->numOfUdfs);
|
||||
|
||||
int32_t contLen = pMsg->rpcMsg.contLen - sizeof(SMultiTableInfoMsg);
|
||||
|
||||
|
@ -2857,14 +2877,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
SArray* pList = taosArrayInit(4, POINTER_BYTES);
|
||||
SMultiTableMeta *pMultiMeta = NULL;
|
||||
|
||||
if (num != pInfo->numOfTables + pInfo->numOfVgroups) {
|
||||
if (num != pInfo->numOfTables + pInfo->numOfVgroups + pInfo->numOfUdfs) {
|
||||
mError("msg:%p, app:%p, failed to get multi-tableMeta, msg inconsistent", pMsg, pMsg->rpcMsg.ahandle);
|
||||
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
|
||||
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
||||
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + (sizeof(SFunctionInfoMsg) + TSDB_FUNC_CODE_LEN) * pInfo->numOfUdfs + 16384;
|
||||
pMultiMeta = rpcMallocCont(totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -2932,10 +2952,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
|
||||
int32_t tableNum = pInfo->numOfTables + pInfo->numOfVgroups;
|
||||
// add the additional super table names that needs the vgroup info
|
||||
for(;t < num; ++t) {
|
||||
for(;t < tableNum; ++t) {
|
||||
taosArrayPush(pList, &nameList[t]);
|
||||
}
|
||||
|
||||
|
@ -2943,6 +2962,22 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
|
||||
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
|
||||
|
||||
if (numOfVgroupList > 0) {
|
||||
int32_t remain = totalMallocLen - pMultiMeta->contLen;
|
||||
int32_t vsize = calculateMultipleVgroupMsgLength(pList);
|
||||
if (remain < vsize) {
|
||||
totalMallocLen += vsize;
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
mnodeDecTableRef(pMsg->pTable);
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
|
||||
for(int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char* name = taosArrayGetP(pList, i);
|
||||
|
||||
|
@ -2959,6 +2994,36 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
|
||||
|
||||
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
||||
|
||||
for(int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) {
|
||||
char buf[TSDB_FUNC_NAME_LEN] = {0};
|
||||
strcpy(buf, nameList[t]);
|
||||
|
||||
SFuncObj* pFuncObj = mnodeGetFunc(buf);
|
||||
if (pFuncObj == NULL) {
|
||||
mError("function %s does not exist", buf);
|
||||
code = TSDB_CODE_MND_INVALID_FUNC;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) msg;
|
||||
|
||||
strcpy(pFuncInfo->name, buf);
|
||||
pFuncInfo->len = htonl(pFuncObj->contLen);
|
||||
memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen);
|
||||
|
||||
pFuncInfo->funcType = htonl(pFuncObj->funcType);
|
||||
pFuncInfo->resType = pFuncObj->resType;
|
||||
pFuncInfo->resBytes = htons(pFuncObj->resBytes);
|
||||
pFuncInfo->bufSize = htonl(pFuncObj->bufSize);
|
||||
|
||||
msg += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
|
||||
}
|
||||
|
||||
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
|
||||
|
||||
pMultiMeta->numOfUdf = htonl(pInfo->numOfUdfs);
|
||||
|
||||
pMsg->rpcRsp.rsp = pMultiMeta;
|
||||
pMsg->rpcRsp.len = pMultiMeta->contLen;
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
@ -3417,4 +3482,4 @@ int32_t mnodeCompactTables() {
|
|||
mnodeCompactChildTables();
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -236,6 +236,7 @@ typedef struct SSqlInfo {
|
|||
bool valid;
|
||||
SArray *list; // todo refactor
|
||||
char msg[256];
|
||||
SArray *funcs;
|
||||
union {
|
||||
SCreateTableSql *pCreateTableInfo;
|
||||
SAlterTableInfo *pAlterInfo;
|
||||
|
@ -279,6 +280,7 @@ SRelationInfo *addSubqueryElem(SRelationInfo* pRelationInfo, SArray* pSub, SStrT
|
|||
// sql expr leaf node
|
||||
tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType);
|
||||
tSqlExpr *tSqlExprCreateFunction(SArray *pParam, SStrToken *pFuncToken, SStrToken *endToken, int32_t optType);
|
||||
SArray *tStrTokenAppend(SArray *pList, SStrToken *pToken);
|
||||
|
||||
tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType);
|
||||
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc);
|
||||
|
|
|
@ -123,6 +123,8 @@ typedef struct SQueryInfo {
|
|||
int32_t round; // 0/1/....
|
||||
int32_t bufLen;
|
||||
char* buf;
|
||||
|
||||
bool udfCopy;
|
||||
SArray *pUdfInfo;
|
||||
|
||||
struct SQInfo *pQInfo; // global merge operator
|
||||
|
|
|
@ -697,10 +697,10 @@ expr(A) ::= BOOL(X). { A = tSqlExprCreateIdValue(&X, TK_BOOL);}
|
|||
expr(A) ::= NULL(X). { A = tSqlExprCreateIdValue(&X, TK_NULL);}
|
||||
|
||||
// ordinary functions: min(x), max(x), top(k, 20)
|
||||
expr(A) ::= ID(X) LP exprlist(Y) RP(E). { A = tSqlExprCreateFunction(Y, &X, &E, X.type); }
|
||||
expr(A) ::= ID(X) LP exprlist(Y) RP(E). { tStrTokenAppend(pInfo->funcs, &X); A = tSqlExprCreateFunction(Y, &X, &E, X.type); }
|
||||
|
||||
// for parsing sql functions with wildcard for parameters. e.g., count(*)/first(*)/last(*) operation
|
||||
expr(A) ::= ID(X) LP STAR RP(Y). { A = tSqlExprCreateFunction(NULL, &X, &Y, X.type); }
|
||||
expr(A) ::= ID(X) LP STAR RP(Y). { tStrTokenAppend(pInfo->funcs, &X); A = tSqlExprCreateFunction(NULL, &X, &Y, X.type); }
|
||||
|
||||
// is (not) null expression
|
||||
expr(A) ::= expr(X) IS NULL. {A = tSqlExprCreate(X, NULL, TK_ISNULL);}
|
||||
|
|
|
@ -28,6 +28,7 @@ SSqlInfo qSqlParse(const char *pStr) {
|
|||
|
||||
SSqlInfo sqlInfo = {0};
|
||||
sqlInfo.valid = true;
|
||||
sqlInfo.funcs = taosArrayInit(4, sizeof(SStrToken));
|
||||
|
||||
int32_t i = 0;
|
||||
while (1) {
|
||||
|
@ -120,6 +121,19 @@ void tSqlExprListDestroy(SArray *pList) {
|
|||
taosArrayDestroyEx(pList, freeExprElem);
|
||||
}
|
||||
|
||||
|
||||
SArray *tStrTokenAppend(SArray *pList, SStrToken *pToken) {
|
||||
if (pList == NULL) {
|
||||
pList = taosArrayInit(4, sizeof(tVariantListItem));
|
||||
}
|
||||
|
||||
if (pToken) {
|
||||
taosArrayPush(pList, pToken);
|
||||
}
|
||||
|
||||
return pList;
|
||||
}
|
||||
|
||||
tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType) {
|
||||
tSqlExpr *pSqlExpr = calloc(1, sizeof(tSqlExpr));
|
||||
|
||||
|
|
2210
src/query/src/sql.c
2210
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue