Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
de82e4826f
|
@ -3820,6 +3820,7 @@ typedef struct {
|
||||||
uint32_t phyLen;
|
uint32_t phyLen;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* msg;
|
char* msg;
|
||||||
|
int8_t source;
|
||||||
} SVDeleteReq;
|
} SVDeleteReq;
|
||||||
|
|
||||||
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
|
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
|
||||||
|
@ -3841,6 +3842,7 @@ typedef struct SDeleteRes {
|
||||||
char tableFName[TSDB_TABLE_NAME_LEN];
|
char tableFName[TSDB_TABLE_NAME_LEN];
|
||||||
char tsColName[TSDB_COL_NAME_LEN];
|
char tsColName[TSDB_COL_NAME_LEN];
|
||||||
int64_t ctimeMs; // fill by vnode
|
int64_t ctimeMs; // fill by vnode
|
||||||
|
int8_t source;
|
||||||
} SDeleteRes;
|
} SDeleteRes;
|
||||||
|
|
||||||
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
|
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
|
||||||
|
|
|
@ -78,6 +78,7 @@ typedef struct SSchedulerReq {
|
||||||
void* chkKillParam;
|
void* chkKillParam;
|
||||||
SExecResult* pExecRes;
|
SExecResult* pExecRes;
|
||||||
void** pFetchRes;
|
void** pFetchRes;
|
||||||
|
int8_t source;
|
||||||
} SSchedulerReq;
|
} SSchedulerReq;
|
||||||
|
|
||||||
int32_t schedulerInit(void);
|
int32_t schedulerInit(void);
|
||||||
|
|
|
@ -284,6 +284,7 @@ typedef struct SRequestObj {
|
||||||
void* pWrapper;
|
void* pWrapper;
|
||||||
SMetaData parseMeta;
|
SMetaData parseMeta;
|
||||||
char* effectiveUser;
|
char* effectiveUser;
|
||||||
|
int8_t source;
|
||||||
} SRequestObj;
|
} SRequestObj;
|
||||||
|
|
||||||
typedef struct SSyncQueryParam {
|
typedef struct SSyncQueryParam {
|
||||||
|
@ -306,10 +307,10 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
||||||
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
|
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
|
||||||
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
||||||
|
|
||||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source);
|
||||||
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
|
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
|
||||||
|
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source);
|
||||||
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
||||||
int64_t reqid);
|
int64_t reqid);
|
||||||
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
|
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
|
||||||
|
|
|
@ -743,6 +743,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
.chkKillFp = chkRequestKilled,
|
.chkKillFp = chkRequestKilled,
|
||||||
.chkKillParam = (void*)pRequest->self,
|
.chkKillParam = (void*)pRequest->self,
|
||||||
.pExecRes = &res,
|
.pExecRes = &res,
|
||||||
|
.source = pRequest->source,
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
@ -1212,6 +1213,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
.chkKillFp = chkRequestKilled,
|
.chkKillFp = chkRequestKilled,
|
||||||
.chkKillParam = (void*)pRequest->self,
|
.chkKillParam = (void*)pRequest->self,
|
||||||
.pExecRes = NULL,
|
.pExecRes = NULL,
|
||||||
|
.source = pRequest->source,
|
||||||
};
|
};
|
||||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
|
@ -2475,7 +2477,7 @@ void syncQueryFn(void* param, void* res, int32_t code) {
|
||||||
tsem_post(&pParam->sem);
|
tsem_post(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) {
|
||||||
if (sql == NULL || NULL == fp) {
|
if (sql == NULL || NULL == fp) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
if (fp) {
|
if (fp) {
|
||||||
|
@ -2501,6 +2503,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRequest->source = source;
|
||||||
pRequest->body.queryFp = fp;
|
pRequest->body.queryFp = fp;
|
||||||
doAsyncQuery(pRequest, false);
|
doAsyncQuery(pRequest, false);
|
||||||
}
|
}
|
||||||
|
@ -2535,7 +2538,7 @@ void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_
|
||||||
doAsyncQuery(pRequest, false);
|
doAsyncQuery(pRequest, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
|
||||||
if (NULL == taos) {
|
if (NULL == taos) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2550,7 +2553,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
||||||
}
|
}
|
||||||
tsem_init(¶m->sem, 0, 0);
|
tsem_init(¶m->sem, 0, 0);
|
||||||
|
|
||||||
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
|
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
|
||||||
tsem_wait(¶m->sem);
|
tsem_wait(¶m->sem);
|
||||||
|
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
|
|
|
@ -402,7 +402,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||||
return pResInfo->userFields;
|
return pResInfo->userFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
|
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false, TD_REQ_FROM_APP); }
|
||||||
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
|
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
|
||||||
return taosQueryImplWithReqid(taos, sql, false, reqid);
|
return taosQueryImplWithReqid(taos, sql, false, reqid);
|
||||||
}
|
}
|
||||||
|
@ -828,7 +828,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) {
|
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
TAOS_RES *pObj = taosQueryImpl(taos, sql, true);
|
TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP);
|
||||||
|
|
||||||
int code = taos_errno(pObj);
|
int code = taos_errno(pObj);
|
||||||
|
|
||||||
|
@ -1126,7 +1126,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
|
||||||
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
||||||
int64_t connId = *(int64_t *)taos;
|
int64_t connId = *(int64_t *)taos;
|
||||||
tscDebug("taos_query_a start with sql:%s", sql);
|
tscDebug("taos_query_a start with sql:%s", sql);
|
||||||
taosAsyncQueryImpl(connId, sql, fp, param, false);
|
taosAsyncQueryImpl(connId, sql, fp, param, false, TD_REQ_FROM_APP);
|
||||||
tscDebug("taos_query_a end with sql:%s", sql);
|
tscDebug("taos_query_a end with sql:%s", sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1256,7 +1256,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
|
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
|
||||||
req.tsColName, req.skey, req.tsColName, req.ekey);
|
req.tsColName, req.skey, req.tsColName, req.ekey);
|
||||||
|
|
||||||
TAOS_RES* res = taos_query(taos, sql);
|
TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
|
||||||
SRequestObj* pRequest = (SRequestObj*)res;
|
SRequestObj* pRequest = (SRequestObj*)res;
|
||||||
code = pRequest->code;
|
code = pRequest->code;
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
|
||||||
|
|
|
@ -586,7 +586,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsNumOfSupportVnodes = tsNumOfCores * 2;
|
tsNumOfSupportVnodes = tsNumOfCores * 2;
|
||||||
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
||||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||||
|
|
|
@ -7192,6 +7192,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
|
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -7228,6 +7229,9 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1;
|
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1;
|
||||||
pReq->phyLen = msgLen;
|
pReq->phyLen = msgLen;
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
|
||||||
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -8427,6 +8431,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
|
||||||
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
|
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
|
||||||
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
|
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1;
|
if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1;
|
||||||
|
if (tEncodeI8(pCoder, pRes->source) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8451,6 +8456,9 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
|
||||||
if (!tDecodeIsEnd(pCoder)) {
|
if (!tDecodeIsEnd(pCoder)) {
|
||||||
if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1;
|
if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(pCoder)) {
|
||||||
|
if (tDecodeI8(pCoder, &pRes->source) < 0) return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
|
||||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
||||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||||
|
|
||||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||||
|
|
|
@ -1409,24 +1409,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
||||||
strcpy(dcfgReq.config, "resetlog");
|
strcpy(dcfgReq.config, "resetlog");
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
|
|
||||||
int32_t optLen = strlen("supportvnodes");
|
|
||||||
int32_t flag = -1;
|
|
||||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
|
||||||
if (code < 0) return code;
|
|
||||||
|
|
||||||
if (flag < 0 || flag > 4096) {
|
|
||||||
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
|
||||||
goto _err_out;
|
|
||||||
}
|
|
||||||
if (flag == 0) {
|
|
||||||
flag = tsNumOfCores * 2;
|
|
||||||
}
|
|
||||||
flag = TMAX(flag, 2);
|
|
||||||
|
|
||||||
strcpy(dcfgReq.config, "supportvnodes");
|
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
|
||||||
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||||
int32_t optLen = strlen("s3blocksize");
|
int32_t optLen = strlen("s3blocksize");
|
||||||
int32_t flag = -1;
|
int32_t flag = -1;
|
||||||
|
|
|
@ -1813,7 +1813,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||||
pStream->name, pTrans->id);
|
pStream->name, pTrans->id);
|
||||||
|
|
||||||
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans);
|
int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
|
||||||
|
|
||||||
// todo: not continue, drop all and retry again
|
// todo: not continue, drop all and retry again
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -462,14 +462,22 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||||
|
|
||||||
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||||
|
|
||||||
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
|
SEpSet epset = {0};
|
||||||
|
bool hasEpset = false;
|
||||||
|
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
}
|
}
|
||||||
|
@ -478,14 +486,14 @@ static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// build trans to update the epset
|
// build trans to update the epset
|
||||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
||||||
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
|
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
|
||||||
taosWLockLatch(&pStream->lock);
|
taosWLockLatch(&pStream->lock);
|
||||||
|
|
||||||
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
||||||
while (streamTaskIterNextTask(pIter)) {
|
while (streamTaskIterNextTask(pIter)) {
|
||||||
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
||||||
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
|
int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamTaskIter(pIter);
|
destroyStreamTaskIter(pIter);
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
|
|
|
@ -263,8 +263,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
|
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
|
||||||
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
|
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
|
||||||
} else if (pHead->msgType == TDMT_VND_DELETE) {
|
} else if (pHead->msgType == TDMT_VND_DELETE) {
|
||||||
fetchVer++;
|
PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes)
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -715,6 +715,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
|
||||||
uint64_t tId = req.taskId;
|
uint64_t tId = req.taskId;
|
||||||
int64_t rId = 0;
|
int64_t rId = 0;
|
||||||
int32_t eId = -1;
|
int32_t eId = -1;
|
||||||
|
pRes->source = req.source;
|
||||||
|
|
||||||
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
|
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
|
||||||
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
|
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
|
||||||
|
|
|
@ -304,6 +304,7 @@ typedef struct SSchJob {
|
||||||
SSchResInfo userRes;
|
SSchResInfo userRes;
|
||||||
char *sql;
|
char *sql;
|
||||||
SQueryProfileSummary summary;
|
SQueryProfileSummary summary;
|
||||||
|
int8_t source;
|
||||||
} SSchJob;
|
} SSchJob;
|
||||||
|
|
||||||
typedef struct SSchTaskCtx {
|
typedef struct SSchTaskCtx {
|
||||||
|
|
|
@ -746,6 +746,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
||||||
pJob->chkKillParam = pReq->chkKillParam;
|
pJob->chkKillParam = pReq->chkKillParam;
|
||||||
pJob->userRes.execFp = pReq->execFp;
|
pJob->userRes.execFp = pReq->execFp;
|
||||||
pJob->userRes.cbParam = pReq->cbParam;
|
pJob->userRes.cbParam = pReq->cbParam;
|
||||||
|
pJob->source = pReq->source;
|
||||||
|
|
||||||
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
|
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
|
||||||
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
|
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
|
||||||
|
|
|
@ -1086,6 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
req.sqlLen = strlen(pJob->sql);
|
req.sqlLen = strlen(pJob->sql);
|
||||||
req.sql = (char *)pJob->sql;
|
req.sql = (char *)pJob->sql;
|
||||||
req.msg = pTask->msg;
|
req.msg = pTask->msg;
|
||||||
|
req.source = pJob->source;
|
||||||
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
|
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
|
||||||
msg = taosMemoryCalloc(1, msgSize);
|
msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
|
|
|
@ -39,7 +39,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
|
||||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for the dispath info and the upstream task info
|
// check for the dispatch info and the upstream task info
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__SOURCE) {
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
@ -622,6 +622,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
EPSET_TO_STR(pEpSet, buf);
|
||||||
|
int32_t id = pTask->id.taskId;
|
||||||
|
|
||||||
int8_t type = pTask->outputInfo.type;
|
int8_t type = pTask->outputInfo.type;
|
||||||
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -633,8 +634,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
|
|
||||||
if (pVgInfo->vgId == nodeId) {
|
if (pVgInfo->vgId == nodeId) {
|
||||||
epsetAssign(&pVgInfo->epSet, pEpSet);
|
epsetAssign(&pVgInfo->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId,
|
||||||
pVgInfo->taskId, nodeId, buf);
|
buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -642,8 +643,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
|
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
|
||||||
if (pDispatcher->nodeId == nodeId) {
|
if (pDispatcher->nodeId == nodeId) {
|
||||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId,
|
||||||
pDispatcher->taskId, nodeId, buf);
|
buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,7 +117,7 @@ echo "supportVnodes 1024" >> $TAOS_CFG
|
||||||
echo "statusInterval 1" >> $TAOS_CFG
|
echo "statusInterval 1" >> $TAOS_CFG
|
||||||
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
||||||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||||
echo "debugFlag 0" >> $TAOS_CFG
|
echo "debugFlag 135" >> $TAOS_CFG
|
||||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||||
echo "uDebugFlag 143" >> $TAOS_CFG
|
echo "uDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
||||||
|
|
|
@ -574,6 +574,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
||||||
|
|
||||||
if (g_conf.snapShot) {
|
if (g_conf.snapShot) {
|
||||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
|
Loading…
Reference in New Issue