Merge branch '3.0' into feature/TD-14481-3.0

This commit is contained in:
Cary Xu 2022-06-07 18:04:49 +08:00
commit 328cd92094
32 changed files with 571 additions and 174 deletions

View File

@ -2485,6 +2485,32 @@ typedef struct {
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
char tbFName[TSDB_TABLE_FNAME_LEN];
} STableIndexReq;
int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct {
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
char* expr;
} STableIndexInfo;
typedef struct {
SArray* pIndex;
} STableIndexRsp;
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
typedef struct {
int8_t mqMsgType;
int32_t code;

View File

@ -130,6 +130,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
@ -172,10 +173,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
@ -186,12 +183,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
@ -205,11 +198,12 @@ enum {
TD_NEW_MSG_SEG(TDMT_QND_MSG)
TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
//shared by snode and vnode
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)

View File

@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);

View File

@ -2394,6 +2394,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp
return 0;
}
int32_t tSerializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tbFName) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tbFName) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->offset) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->sliding) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->dstTbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->dstVgId) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->expr) < 0) return -1;
return 0;
}
int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexInfo(SDecoder *pDecoder, STableIndexInfo *pInfo) {
if (tDecodeI8(pDecoder, &pInfo->intervalUnit) < 0) return -1;
if (tDecodeI8(pDecoder, &pInfo->slidingUnit) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->offset) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->sliding) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->dstTbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->dstVgId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pInfo->expr) < 0) return -1;
return 0;
}
int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pRsp->pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == pRsp->pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -210,7 +211,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -94,9 +94,10 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by SNODE
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
/*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -210,8 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
createReq.standby);
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
@ -333,11 +332,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -352,13 +346,14 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -95,6 +95,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t vgId = ntohl(pHead->vgId);
if (vgId == QNODE_HANDLE) {
pWrapper = &pDnode->wrappers[QNODE];
} else if (vgId == SNODE_HANDLE) {
pWrapper = &pDnode->wrappers[SNODE];
} else if (vgId == MNODE_HANDLE) {
pWrapper = &pDnode->wrappers[MNODE];
} else {

View File

@ -309,6 +309,7 @@ typedef struct {
int8_t slidingUnit;
int8_t timezone;
int32_t dstVgId; // for stream
int64_t dstTbUid;
int64_t interval;
int64_t offset;
int64_t sliding;

View File

@ -132,7 +132,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
return 0;
}
@ -156,7 +156,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
return 0;
}
@ -222,7 +222,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
}
return 0;
}
@ -267,8 +267,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
/*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
return 0;
}
@ -361,7 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
@ -407,7 +406,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
@ -438,7 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);

View File

@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return code;
}
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) {
int32_t code = 0;
SSmaObj *pSma = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STableIndexInfo info;
while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) {
continue;
}
info.intervalUnit = pSma->intervalUnit;
info.slidingUnit = pSma->slidingUnit;
info.interval = pSma->interval;
info.offset = pSma->offset;
info.sliding = pSma->sliding;
info.dstTbUid = pSma->dstTbUid;
info.dstVgId = pSma->dstVgId;
info.expr = taosMemoryMalloc(pSma->exprLen + 1);
if (info.expr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
sdbRelease(pSdb, pSma);
return code;
}
memcpy(info.expr, pSma->expr, pSma->exprLen);
info.expr[pSma->exprLen] = 0;
if (NULL == taosArrayPush(rsp->pIndex, &info)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
taosMemoryFree(info.expr);
sdbRelease(pSdb, pSma);
return code;
}
*exist = true;
sdbRelease(pSdb, pSma);
}
return code;
}
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node;
@ -916,6 +967,59 @@ _OVER:
return code;
}
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
STableIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
STableIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == rsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
code = -1;
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
} else {
int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0) {
mError("failed to get table index %s since %s", indexReq.tbFName, terrstr());
}
return code;
}
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;

View File

@ -54,8 +54,8 @@ int32_t mndInitStream(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
/*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/

View File

@ -90,7 +90,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
// operator exec
if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) {
if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
if (pTask == NULL) {

View File

@ -158,7 +158,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// TODO: handle error
}
break;
case TDMT_VND_TASK_DEPLOY: {
case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
@ -238,21 +238,19 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_VND_TASK_RUN: {
int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg);
pMsg->pCont = NULL;
return code;
}
case TDMT_VND_TASK_DISPATCH:
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER:
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_DISPATCH_RSP:
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER_RSP:
case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);

View File

@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation {
int32_t opId;
void *data;
bool syncOp;
uint64_t seqId;
tsem_t rspSem;
} SCtgCacheOperation;
typedef struct SCtgQNode {
SCtgCacheOperation op;
SCtgCacheOperation *op;
struct SCtgQNode *next;
} SCtgQNode;
typedef struct SCtgQueue {
SRWLatch qlock;
uint64_t seqId;
uint64_t seqDone;
SCtgQNode *head;
SCtgQNode *tail;
tsem_t reqSem;
tsem_t rspSem;
uint64_t qRemainNum;
} SCtgQueue;
@ -493,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);

View File

@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == gCtgMgmt.queue.head) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
@ -1141,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
}
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
CTG_API_ENTER();
@ -1195,10 +1201,6 @@ void catalogDestroy(void) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (tsem_post(&gCtgMgmt.queue.rspSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
}

View File

@ -501,25 +501,6 @@ _return:
return TSDB_CODE_SUCCESS;
}
void ctgWaitOpDone(SCtgCacheOperation *action) {
while (true) {
tsem_wait(&gCtgMgmt.queue.rspSem);
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
break;
}
if (gCtgMgmt.queue.seqDone >= action->seqId) {
break;
}
tsem_post(&gCtgMgmt.queue.rspSem);
sched_yield();
}
}
void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head;
@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
taosMemoryFreeClear(orig);
*op = &node->op;
*op = node->op;
}
@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
}
operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1);
if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
node->op = *operation;
node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
gCtgMgmt.queue.tail->next = node;
@ -558,7 +541,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (operation->syncOp) {
ctgWaitOpDone(operation);
tsem_wait(&operation->rspSem);
taosMemoryFree(operation);
}
return TSDB_CODE_SUCCESS;
@ -567,7 +551,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE;
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
@ -583,21 +569,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP;
op->syncOp = syncOp;
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
@ -612,15 +601,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -628,7 +617,10 @@ _return:
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META;
op->syncOp = syncOp;
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
@ -641,15 +633,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
msg->dbId = dbId;
msg->suid = suid;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -657,7 +649,10 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META;
op->syncOp = syncOp;
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
@ -669,21 +664,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
strncpy(msg->tbName, tbName, sizeof(msg->tbName));
msg->dbId = dbId;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP;
op->syncOp = syncOp;
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
@ -701,22 +699,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
msg->dbId = dbId;
msg->dbInfo = dbInfo;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(action.data);
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
@ -731,9 +732,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
msg->pCtg = pCtg;
msg->output = output;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
@ -746,7 +747,9 @@ _return:
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0;
SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET;
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
@ -758,9 +761,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
msg->vgId = vgId;
msg->epSet = *pEpSet;
operation.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &operation));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
@ -775,7 +778,10 @@ _return:
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncOp = syncOp};
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER;
op->syncOp = syncOp;
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
@ -785,9 +791,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
msg->pCtg = pCtg;
msg->userAuth = *pAuth;
action.data = msg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
@ -1607,6 +1613,39 @@ void ctgUpdateThreadUnexpectedStopped(void) {
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
}
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
while (true) {
node = gCtgMgmt.queue.head->next;
while (node) {
if (node->op) {
taosMemoryFree(node->op->data);
if (node->op->syncOp) {
tsem_post(&node->op->rspSem);
} else {
taosMemoryFree(node->op);
}
}
nodeNext = node->next;
taosMemoryFree(node);
node = nodeNext;
}
if (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
} else {
break;
}
}
taosMemoryFreeClear(gCtgMgmt.queue.head);
gCtgMgmt.queue.tail = NULL;
}
void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog");
#ifdef WINDOWS
@ -1622,7 +1661,8 @@ void* ctgUpdateThreadFunc(void* param) {
}
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
ctgCleanupCacheQueue();
break;
}
@ -1634,10 +1674,8 @@ void* ctgUpdateThreadFunc(void* param) {
(*gCtgCacheOperation[operation->opId].func)(operation);
gCtgMgmt.queue.seqDone = operation->seqId;
if (operation->syncOp) {
tsem_post(&gCtgMgmt.queue.rspSem);
tsem_post(&operation->rspSem);
}
CTG_RT_STAT_INC(qDoneNum, 1);

View File

@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got index from mnode, indexName:%s", target);
break;
}
case TDMT_MND_GET_TABLE_INDEX: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got table index from mnode, tbFName:%s", target);
break;
}
case TDMT_MND_RETRIEVE_FUNC: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;

View File

@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexReq indexReq = {0};
strcpy(indexReq.tbFName, input);
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSTableIndexReq(pBuf, bufLen, &indexReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
@ -459,6 +477,22 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexRsp out = {0};
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
*(void **)output = out.pIndex;
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
@ -469,7 +503,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
@ -479,6 +513,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
}
#pragma GCC diagnostic pop

View File

@ -356,7 +356,7 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch);
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);

View File

@ -539,8 +539,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
}
void qwClearExpiredSch(SArray* pExpiredSch) {
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qError("sch %" PRIx64 "destroyed", *sId);
}
qwReleaseScheduler(QW_WRITE, mgmt);
}
}

View File

@ -790,9 +790,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
}
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
sch->hbBrokenTs = 0;
QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) {
@ -912,7 +913,7 @@ _return:
}
if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch);
qwClearExpiredSch(mgmt, pExpiredSch);
}
taosMemoryFreeClear(rspList);

View File

@ -207,6 +207,7 @@ typedef struct SSchJob {
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*

View File

@ -64,6 +64,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob->nodeList = taosArrayDup(pNodeList);
}
pJob->taskList =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0};
SSchTask *pTask = &task;
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) {
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
if (NULL == pTask) {
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p));
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, pTask));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
++pJob->taskNum;
}
@ -1276,15 +1286,11 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
}
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
schGetTaskFromList(pJob->execTasks, taskId, pTask);
schGetTaskFromList(pJob->taskList, taskId, pTask);
if (NULL == *pTask) {
schGetTaskFromList(pJob->succTasks, taskId, pTask);
if (NULL == *pTask) {
SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId);
SCH_JOB_ELOG("task not found in job task list, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
return TSDB_CODE_SUCCESS;
}
@ -1445,6 +1451,7 @@ void schFreeJobImpl(void *job) {
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);

View File

@ -26,7 +26,7 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = TDMT_VND_TASK_RUN,
.msgType = TDMT_STREAM_TASK_RUN,
.pCont = pRunReq,
.contLen = sizeof(SStreamTaskRunReq),
};

View File

@ -190,9 +190,9 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat
}
int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {
qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {
qType = WRITE_QUEUE;
} else {
ASSERT(0);

View File

@ -131,7 +131,7 @@ if __name__ == "__main__":
is_test_framework = 0
key_word = 'tdCases.addWindows'
try:
if key_word in open(fileName).read():
if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1
except:
pass

View File

@ -416,7 +416,7 @@ class TDDnode:
psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows':
for port in range(6030, 6041):
fuserCmd = "fuser -k -n tcp %d" % port
fuserCmd = "fuser -k -n tcp %d > /dev/null" % port
os.system(fuserCmd)
if self.valgrind:
time.sleep(2)

View File

@ -143,7 +143,7 @@ class TDTestCase:
tdSql.checkRows(9)
tdSql.query("select * from t1 where c5 between 136 and 127")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c5 between '~' and 'ˆ'")
tdSql.query("select * from t1 where c5 between '~' and '^'")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c5 not between 1 and 2")
# tdSql.checkRows(0)

View File

@ -565,7 +565,7 @@ class TDTestCase:
if data_ct4_c10[i] is None:
tdSql.checkData( i, 0, None )
else:
time2str = str(int(datetime.datetime.timestamp(data_ct4_c10[i])*1000))
time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str )
tdSql.query("select cast(c10 as nchar(32)) as b from t1")
for i in range(len(data_t1_c10)):
@ -574,7 +574,7 @@ class TDTestCase:
elif i == 10:
continue
else:
time2str = str(int(datetime.datetime.timestamp(data_t1_c10[i])*1000))
time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str )
tdLog.printNoPrefix("==========step38: cast timestamp to binary, expect no changes ")
@ -583,7 +583,7 @@ class TDTestCase:
if data_ct4_c10[i] is None:
tdSql.checkData( i, 0, None )
else:
time2str = str(int(datetime.datetime.timestamp(data_ct4_c10[i])*1000))
time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str )
tdSql.query("select cast(c10 as binary(32)) as b from t1")
for i in range(len(data_t1_c10)):
@ -592,7 +592,7 @@ class TDTestCase:
elif i == 10:
continue
else:
time2str = str(int(datetime.datetime.timestamp(data_t1_c10[i])*1000))
time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str )
tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ")

View File

@ -13,6 +13,9 @@ echo Linux Taosd Test
for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
echo Processing %%i
call :GetTimeSeconds %time%
set time1=!_timeTemp!
echo Start at %time%
set /a a+=1
call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@ -21,7 +24,33 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
exit
:colorEcho
call :GetTimeSeconds %time%
set time2=%_timeTemp%
set /a interTime=%time2% - %time1%
echo End at %time% , cast %interTime%s
echo off
<nul set /p ".=%DEL%" > "%~2"
findstr /v /a:%1 /R "^$" "%~2" nul
del "%~2" > nul 2>&1i
goto :eof
:GetTimeSeconds
set tt=%1
set tt=%tt:.= %
set tt=%tt::= %
set index=1
for %%a in (%tt%) do (
if !index! EQU 1 (
set hh=%%a
)^
else if !index! EQU 2 (
set mm=%%a
)^
else if !index! EQU 3 (
set ss=%%a
)
set /a index=index+1
)
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof

View File

@ -168,7 +168,7 @@ if __name__ == "__main__":
key_word = 'tdCases.addWindows'
is_test_framework = 0
try:
if key_word in open(fileName).read():
if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1
except:
pass