feature/qnode
This commit is contained in:
parent
0afdb145a4
commit
8cfcd6d4f4
|
@ -151,13 +151,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFreeClear(output.dbVgroup);
|
||||
|
||||
tscError("failed to build use db output since %s", terrstr());
|
||||
} else {
|
||||
} else if (output.dbVgroup) {
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
|
||||
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (code1 != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
|
||||
tstrerror(code1));
|
||||
taosMemoryFreeClear(output.dbVgroup);
|
||||
} else {
|
||||
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
|
||||
}
|
||||
|
|
|
@ -367,9 +367,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
|||
}
|
||||
dmReportStartup(pDnode, "dnode-transport", "initialized");
|
||||
|
||||
if (dmStartUdfd(pDnode) != 0) {
|
||||
dError("failed to start udfd");
|
||||
}
|
||||
// if (dmStartUdfd(pDnode) != 0) {
|
||||
// dError("failed to start udfd");
|
||||
// }
|
||||
|
||||
dInfo("dnode-mgmt is initialized");
|
||||
return 0;
|
||||
|
|
|
@ -30,6 +30,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
|
|||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
|
||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||
int32_t mndGetGlobalVgroupVersion(int32_t *vgId);
|
||||
|
||||
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
|
|
|
@ -1191,6 +1191,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
|||
char *p = strchr(usedbReq.db, '.');
|
||||
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
|
||||
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
||||
//mndGetGlobalVgroupVersion(); TODO
|
||||
static int32_t vgVersion = 1;
|
||||
if (usedbReq.vgVersion < vgVersion) {
|
||||
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
|
||||
|
@ -1202,16 +1203,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
|||
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
|
||||
usedbRsp.vgVersion = vgVersion++;
|
||||
|
||||
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
} else {
|
||||
code = 0;
|
||||
}
|
||||
} else {
|
||||
usedbRsp.vgVersion = usedbReq.vgVersion;
|
||||
code = 0;
|
||||
}
|
||||
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||
code = 0;
|
||||
|
||||
// no jump, need to construct rsp
|
||||
} else {
|
||||
|
|
|
@ -1388,6 +1388,14 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||
if (NULL == sch->hbConnInfo.handle) {
|
||||
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
||||
QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
|
||||
if (code) {
|
||||
taosHashCancelIterate(mgmt->schHash, pIter);
|
||||
|
|
|
@ -186,6 +186,7 @@ typedef struct SFilterColCtx {
|
|||
|
||||
typedef struct SFilterCompare {
|
||||
uint8_t type;
|
||||
int8_t precision;
|
||||
uint8_t optr;
|
||||
uint8_t optr2;
|
||||
} SFilterCompare;
|
||||
|
@ -218,6 +219,7 @@ typedef struct SFltTreeStat {
|
|||
int32_t code;
|
||||
int8_t precision;
|
||||
bool scalarMode;
|
||||
SArray* nodeList;
|
||||
SFilterInfo* info;
|
||||
} SFltTreeStat;
|
||||
|
||||
|
@ -303,6 +305,7 @@ struct SFilterInfo {
|
|||
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
|
||||
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
|
||||
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnNode *)((fi)->desc))->node.resType.type)
|
||||
#define FILTER_GET_COL_FIELD_PRECISION(fi) (((SColumnNode *)((fi)->desc))->node.resType.precision)
|
||||
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnNode *)((fi)->desc))->node.resType.bytes)
|
||||
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId)
|
||||
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId)
|
||||
|
@ -317,6 +320,7 @@ struct SFilterInfo {
|
|||
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
|
||||
#define FILTER_UNIT_RIGHT2_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right2)
|
||||
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
|
||||
#define FILTER_UNIT_DATA_PRECISION(u) ((u)->compare.precision)
|
||||
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
|
||||
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
|
||||
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "sclInt.h"
|
||||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "ttime.h"
|
||||
|
||||
OptrStr gOptrStr[] = {
|
||||
{0, "invalid"},
|
||||
|
@ -986,6 +987,7 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
|
|||
assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN));
|
||||
|
||||
info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col);
|
||||
info->units[info->unitNum].compare.precision = FILTER_GET_COL_FIELD_PRECISION(col);
|
||||
|
||||
*uidx = info->unitNum;
|
||||
|
||||
|
@ -1748,6 +1750,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
|||
assert(FILTER_GET_FLAG(right->flag, FLD_TYPE_VALUE));
|
||||
|
||||
uint32_t type = FILTER_UNIT_DATA_TYPE(unit);
|
||||
int8_t precision = FILTER_UNIT_DATA_PRECISION(unit);
|
||||
SFilterField* fi = right;
|
||||
|
||||
SValueNode* var = (SValueNode *)fi->desc;
|
||||
|
@ -1801,6 +1804,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
|||
} else {
|
||||
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
||||
out.columnData->info.type = type;
|
||||
out.columnData->info.precision = precision;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
out.columnData->info.bytes = bytes;
|
||||
} else {
|
||||
|
@ -3475,6 +3479,33 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fltAddValueNodeToConverList(SFltTreeStat *stat, SValueNode* pNode) {
|
||||
if (NULL == stat->nodeList) {
|
||||
stat->nodeList = taosArrayInit(10, POINTER_BYTES);
|
||||
if (NULL == stat->nodeList) {
|
||||
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(stat->nodeList, &pNode)) {
|
||||
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void fltConvertToTsValueNode(SFltTreeStat *stat, SValueNode* valueNode) {
|
||||
char *timeStr = valueNode->datum.p;
|
||||
if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, stat->precision, &valueNode->datum.i) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
valueNode->datum.i = 0;
|
||||
}
|
||||
taosMemoryFree(timeStr);
|
||||
|
||||
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
}
|
||||
|
||||
EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
||||
SFltTreeStat *stat = (SFltTreeStat *)pContext;
|
||||
|
||||
|
@ -3504,25 +3535,23 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
SValueNode *valueNode = (SValueNode *)*pNode;
|
||||
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type) {
|
||||
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR != valueNode->node.resType.type) {
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (stat->precision < 0) {
|
||||
//TODO
|
||||
int32_t code = fltAddValueNodeToConverList(stat, valueNode);
|
||||
if (code) {
|
||||
stat->code = code;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
char *timeStr = valueNode->datum.p;
|
||||
if (taosParseTime(valueNode->datum.p, &valueNode->datum.i, valueNode->node.resType.bytes, stat->precision, tsDaylight) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
|
||||
}
|
||||
TODO
|
||||
#else
|
||||
fltConvertToTsValueNode(stat, valueNode);
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
|
@ -3619,9 +3648,22 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
|
||||
int32_t code = 0;
|
||||
nodesRewriteExprPostOrder(pNode, fltReviseRewriter, (void *)pStat);
|
||||
|
||||
FLT_RET(pStat->code);
|
||||
FLT_ERR_JRET(pStat->code);
|
||||
|
||||
int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
|
||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
|
||||
|
||||
fltConvertToTsValueNode(pStat, valueNode);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(pStat->nodeList);
|
||||
FLT_RET(code);
|
||||
}
|
||||
|
||||
int32_t fltOptimizeNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "ttypes.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define LEFT_COL ((pLeftCol->info.type == TSDB_DATA_TYPE_JSON ? (void*)pLeftCol : pLeftCol->pData))
|
||||
#define RIGHT_COL ((pRightCol->info.type == TSDB_DATA_TYPE_JSON ? (void*)pRightCol : pRightCol->pData))
|
||||
|
@ -252,6 +253,15 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
|
|||
return p;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam* pOut, int32_t rowIndex) {
|
||||
int64_t value = 0;
|
||||
if (taosParseTime(buf, &value, strlen(buf), pOut->columnData->info.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
|
||||
value = 0;
|
||||
}
|
||||
|
||||
colDataAppendInt64(pOut->columnData, rowIndex, &value);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void varToSigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
|
||||
int64_t value = strtoll(buf, NULL, 10);
|
||||
colDataAppendInt64(pOut->columnData, rowIndex, &value);
|
||||
|
@ -295,7 +305,7 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
|||
_bufConverteFunc func = NULL;
|
||||
if (TSDB_DATA_TYPE_BOOL == outType) {
|
||||
func = varToBool;
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(outType) || TSDB_DATA_TYPE_TIMESTAMP == outType) {
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(outType)) {
|
||||
func = varToSigned;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(outType)) {
|
||||
func = varToUnsigned;
|
||||
|
@ -305,6 +315,8 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
|||
ASSERT(inType == TSDB_DATA_TYPE_VARCHAR);
|
||||
func = varToNchar;
|
||||
vton = true;
|
||||
} else if (TSDB_DATA_TYPE_TIMESTAMP == outType) {
|
||||
func = varToTimestamp;
|
||||
} else {
|
||||
sclError("invalid convert outType:%d", outType);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
|
@ -594,8 +606,8 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
|
|||
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0,
|
||||
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0,
|
||||
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0,
|
||||
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
||||
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 0, 7, 0, 0,
|
||||
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
||||
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0,
|
||||
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
||||
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0,
|
||||
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0,
|
||||
|
|
Loading…
Reference in New Issue