Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

This commit is contained in:
Xiaoyu Wang 2022-06-08 09:26:16 +08:00
commit 08209f30ae
44 changed files with 3675 additions and 3524 deletions

View File

@ -2485,6 +2485,32 @@ typedef struct {
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp); int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp); int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
char tbFName[TSDB_TABLE_FNAME_LEN];
} STableIndexReq;
int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct {
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
char* expr;
} STableIndexInfo;
typedef struct {
SArray* pIndex;
} STableIndexRsp;
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
typedef struct { typedef struct {
int8_t mqMsgType; int8_t mqMsgType;
int32_t code; int32_t code;

View File

@ -130,6 +130,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
@ -172,10 +173,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
@ -186,12 +183,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
@ -205,11 +198,12 @@ enum {
TD_NEW_MSG_SEG(TDMT_QND_MSG) TD_NEW_MSG_SEG(TDMT_QND_MSG)
TD_NEW_MSG_SEG(TDMT_SND_MSG) //shared by snode and vnode
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)

View File

@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);

View File

@ -131,14 +131,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq); SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
@ -189,6 +182,7 @@ typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct { typedef struct {
int64_t smaId; int64_t smaId;
// following are not applicable to encoder and decoder // following are not applicable to encoder and decoder
void* vnode;
FSmaSink* smaSink; FSmaSink* smaSink;
} STaskSinkSma; } STaskSinkSma;
@ -270,7 +264,7 @@ struct SStreamTask {
SStreamQueue* outputQueue; SStreamQueue* outputQueue;
// application storage // application storage
void* ahandle; // void* ahandle;
}; };
SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId); SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId);
@ -311,7 +305,16 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
} }
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
taosWriteQitem(pTask->outputQueue->queue, pBlock); if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
}
return 0; return 0;
} }
@ -319,26 +322,11 @@ typedef struct {
int32_t reserved; int32_t reserved;
} SStreamTaskDeployRsp; } SStreamTaskDeployRsp;
typedef struct {
// SMsgHead head;
int64_t streamId;
int32_t taskId;
SArray* data; // SArray<SSDataBlock>
} SStreamTaskExecReq;
typedef struct { typedef struct {
// SMsgHead head; // SMsgHead head;
SStreamTask* task; SStreamTask* task;
} SStreamTaskDeployReq; } SStreamTaskDeployReq;
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq);
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq);
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq);
typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t streamId; int64_t streamId;

View File

@ -2394,6 +2394,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp
return 0; return 0;
} }
int32_t tSerializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tbFName) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tbFName) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->offset) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->sliding) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->dstTbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->dstVgId) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->expr) < 0) return -1;
return 0;
}
int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexInfo(SDecoder *pDecoder, STableIndexInfo *pInfo) {
if (tDecodeI8(pDecoder, &pInfo->intervalUnit) < 0) return -1;
if (tDecodeI8(pDecoder, &pInfo->slidingUnit) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->offset) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->sliding) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->dstTbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->dstVgId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pInfo->expr) < 0) return -1;
return 0;
}
int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pRsp->pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == pRsp->pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);

View File

@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -210,7 +211,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -94,9 +94,10 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by SNODE if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
/*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/ if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:

View File

@ -210,8 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby);
createReq.standby);
vmGenerateVnodeCfg(&createReq, &vnodeCfg); vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) { if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
@ -333,11 +332,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -352,13 +346,14 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -95,6 +95,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t vgId = ntohl(pHead->vgId); int32_t vgId = ntohl(pHead->vgId);
if (vgId == QNODE_HANDLE) { if (vgId == QNODE_HANDLE) {
pWrapper = &pDnode->wrappers[QNODE]; pWrapper = &pDnode->wrappers[QNODE];
} else if (vgId == SNODE_HANDLE) {
pWrapper = &pDnode->wrappers[SNODE];
} else if (vgId == MNODE_HANDLE) { } else if (vgId == MNODE_HANDLE) {
pWrapper = &pDnode->wrappers[MNODE]; pWrapper = &pDnode->wrappers[MNODE];
} else { } else {

View File

@ -309,6 +309,7 @@ typedef struct {
int8_t slidingUnit; int8_t slidingUnit;
int8_t timezone; int8_t timezone;
int32_t dstVgId; // for stream int32_t dstVgId; // for stream
int64_t dstTbUid;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;

View File

@ -132,7 +132,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
return 0; return 0;
} }
@ -156,7 +156,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
return 0; return 0;
} }
@ -222,7 +222,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
} }
return 0; return 0;
} }
@ -267,8 +267,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
/*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/ mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
return 0; return 0;
} }
@ -361,7 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
@ -407,7 +406,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb); ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
@ -438,7 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} else { } else {
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0); SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only // one sink only
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);

View File

@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return code; return code;
} }
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) {
int32_t code = 0;
SSmaObj *pSma = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STableIndexInfo info;
while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) {
continue;
}
info.intervalUnit = pSma->intervalUnit;
info.slidingUnit = pSma->slidingUnit;
info.interval = pSma->interval;
info.offset = pSma->offset;
info.sliding = pSma->sliding;
info.dstTbUid = pSma->dstTbUid;
info.dstVgId = pSma->dstVgId;
info.expr = taosMemoryMalloc(pSma->exprLen + 1);
if (info.expr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
sdbRelease(pSdb, pSma);
return code;
}
memcpy(info.expr, pSma->expr, pSma->exprLen);
info.expr[pSma->exprLen] = 0;
if (NULL == taosArrayPush(rsp->pIndex, &info)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
taosMemoryFree(info.expr);
sdbRelease(pSdb, pSma);
return code;
}
*exist = true;
sdbRelease(pSdb, pSma);
}
return code;
}
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) { static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
SUserIndexReq indexReq = {0}; SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
@ -916,6 +967,59 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
STableIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
STableIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == rsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
code = -1;
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
} else {
int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0) {
mError("failed to get table index %s since %s", indexReq.tbFName, terrstr());
}
return code;
}
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;

View File

@ -54,8 +54,8 @@ int32_t mndInitStream(SMnode *pMnode) {
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp); /*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/

View File

@ -90,7 +90,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy // stream deploy
// stream stop/resume // stream stop/resume
// operator exec // operator exec
if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) { if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask)); SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {

View File

@ -47,15 +47,16 @@ int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version
int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) { if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
smaWarn("vgId:%d, get tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); smaWarn("vgId:%d, get tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
} }
return code; return code;
} }
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t *days) { int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) { if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d get tSma days failed since %s", pCfg->vgId, tstrerror(terrno)); smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
} }
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
return code; return code;
} }

View File

@ -97,12 +97,16 @@ int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_
goto _err; goto _err;
} }
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE); int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND);
int64_t records = pTsdbCfg->days / mInterval; if (sInterval <= 0) {
*days = pTsdbCfg->days;
return 0;
}
int64_t records = pTsdbCfg->days * 60 / sInterval;
if (records >= SMA_STORAGE_SPLIT_FACTOR) { if (records >= SMA_STORAGE_SPLIT_FACTOR) {
*days = pTsdbCfg->days; *days = pTsdbCfg->days;
} else { } else {
int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE);
int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
@ -111,7 +115,7 @@ int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_
*days = (int32_t)daysPerFile; *days = (int32_t)daysPerFile;
} }
if(*days < pTsdbCfg->days) { if (*days < pTsdbCfg->days) {
*days = pTsdbCfg->days; *days = pTsdbCfg->days;
} }
} }

View File

@ -349,8 +349,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
} }
// sink // sink
pTask->ahandle = pTq->pVnode; /*pTask->ahandle = pTq->pVnode;*/
if (pTask->sinkType == TASK_SINK__SMA) { if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
pTask->smaSink.smaSink = smaHandleRes; pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) { } else if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.vnode = pTq->pVnode;

View File

@ -158,7 +158,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// TODO: handle error // TODO: handle error
} }
break; break;
case TDMT_VND_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
@ -238,21 +238,19 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_VND_TASK_RUN: { case TDMT_STREAM_TASK_RUN:
int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
pMsg->pCont = NULL; case TDMT_STREAM_TASK_DISPATCH:
return code;
}
case TDMT_VND_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER: case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER_RSP: case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
@ -262,9 +260,9 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) { int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing"); vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDeleteRes res = {0}; SDeleteRes res = {0};
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {

View File

@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation {
int32_t opId; int32_t opId;
void *data; void *data;
bool syncOp; bool syncOp;
uint64_t seqId; tsem_t rspSem;
} SCtgCacheOperation; } SCtgCacheOperation;
typedef struct SCtgQNode { typedef struct SCtgQNode {
SCtgCacheOperation op; SCtgCacheOperation *op;
struct SCtgQNode *next; struct SCtgQNode *next;
} SCtgQNode; } SCtgQNode;
typedef struct SCtgQueue { typedef struct SCtgQueue {
SRWLatch qlock; SRWLatch qlock;
uint64_t seqId;
uint64_t seqDone;
SCtgQNode *head; SCtgQNode *head;
SCtgQNode *tail; SCtgQNode *tail;
tsem_t reqSem; tsem_t reqSem;
tsem_t rspSem;
uint64_t qRemainNum; uint64_t qRemainNum;
} SCtgQueue; } SCtgQueue;
@ -493,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);

View File

@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
} }
if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode)); gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == gCtgMgmt.queue.head) { if (NULL == gCtgMgmt.queue.head) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
@ -1141,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
} }
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
CTG_API_ENTER(); CTG_API_ENTER();
@ -1195,10 +1201,6 @@ void catalogDestroy(void) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} }
if (tsem_post(&gCtgMgmt.queue.rspSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1); taosUsleep(1);
} }

View File

@ -501,25 +501,6 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void ctgWaitOpDone(SCtgCacheOperation *action) {
while (true) {
tsem_wait(&gCtgMgmt.queue.rspSem);
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
break;
}
if (gCtgMgmt.queue.seqDone >= action->seqId) {
break;
}
tsem_post(&gCtgMgmt.queue.rspSem);
sched_yield();
}
}
void ctgDequeue(SCtgCacheOperation **op) { void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head; SCtgQNode *orig = gCtgMgmt.queue.head;
@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
taosMemoryFreeClear(orig); taosMemoryFreeClear(orig);
*op = &node->op; *op = node->op;
} }
@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
node->op = *operation; node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail->next = node;
@ -558,7 +541,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (operation->syncOp) { if (operation->syncOp) {
ctgWaitOpDone(operation); tsem_wait(&operation->rspSem);
taosMemoryFree(operation);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -567,7 +551,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE;
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
@ -583,21 +569,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP;
op->syncOp = syncOp;
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
@ -612,15 +601,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
msg->pCtg = pCtg; msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
@ -628,7 +617,10 @@ _return:
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META;
op->syncOp = syncOp;
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
@ -641,15 +633,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
msg->dbId = dbId; msg->dbId = dbId;
msg->suid = suid; msg->suid = suid;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
@ -657,7 +649,10 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META;
op->syncOp = syncOp;
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
@ -669,21 +664,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
strncpy(msg->tbName, tbName, sizeof(msg->tbName)); strncpy(msg->tbName, tbName, sizeof(msg->tbName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP;
op->syncOp = syncOp;
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
@ -701,22 +699,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
msg->dbId = dbId; msg->dbId = dbId;
msg->dbInfo = dbInfo; msg->dbInfo = dbInfo;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
@ -731,9 +732,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->output = output; msg->output = output;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -746,7 +747,9 @@ _return:
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET;
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
@ -758,9 +761,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
msg->vgId = vgId; msg->vgId = vgId;
msg->epSet = *pEpSet; msg->epSet = *pEpSet;
operation.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &operation)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -775,7 +778,10 @@ _return:
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER;
op->syncOp = syncOp;
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
@ -785,9 +791,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->userAuth = *pAuth; msg->userAuth = *pAuth;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1607,6 +1613,39 @@ void ctgUpdateThreadUnexpectedStopped(void) {
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
} }
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
while (true) {
node = gCtgMgmt.queue.head->next;
while (node) {
if (node->op) {
taosMemoryFree(node->op->data);
if (node->op->syncOp) {
tsem_post(&node->op->rspSem);
} else {
taosMemoryFree(node->op);
}
}
nodeNext = node->next;
taosMemoryFree(node);
node = nodeNext;
}
if (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
} else {
break;
}
}
taosMemoryFreeClear(gCtgMgmt.queue.head);
gCtgMgmt.queue.tail = NULL;
}
void* ctgUpdateThreadFunc(void* param) { void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog"); setThreadName("catalog");
#ifdef WINDOWS #ifdef WINDOWS
@ -1622,7 +1661,8 @@ void* ctgUpdateThreadFunc(void* param) {
} }
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem); CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
ctgCleanupCacheQueue();
break; break;
} }
@ -1634,10 +1674,8 @@ void* ctgUpdateThreadFunc(void* param) {
(*gCtgCacheOperation[operation->opId].func)(operation); (*gCtgCacheOperation[operation->opId].func)(operation);
gCtgMgmt.queue.seqDone = operation->seqId;
if (operation->syncOp) { if (operation->syncOp) {
tsem_post(&gCtgMgmt.queue.rspSem); tsem_post(&operation->rspSem);
} }
CTG_RT_STAT_INC(qDoneNum, 1); CTG_RT_STAT_INC(qDoneNum, 1);

View File

@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got index from mnode, indexName:%s", target); qDebug("Got index from mnode, indexName:%s", target);
break; break;
} }
case TDMT_MND_GET_TABLE_INDEX: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got table index from mnode, tbFName:%s", target);
break;
}
case TDMT_MND_RETRIEVE_FUNC: { case TDMT_MND_RETRIEVE_FUNC: {
if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;

View File

@ -1,3 +1,17 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"

View File

@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexReq indexReq = {0};
strcpy(indexReq.tbFName, input);
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSTableIndexReq(pBuf, bufLen, &indexReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output; SUseDbOutput *pOut = output;
@ -459,26 +477,43 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexRsp out = {0};
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
*(void **)output = out.pIndex;
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() { void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@ -356,7 +356,7 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam); void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch); void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);

View File

@ -539,8 +539,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
} }
void qwClearExpiredSch(SArray* pExpiredSch) { void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qError("sch %" PRIx64 "destroyed", *sId);
}
qwReleaseScheduler(QW_WRITE, mgmt);
}
} }

View File

@ -790,9 +790,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
} }
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
sch->hbBrokenTs = 0;
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
@ -912,7 +913,7 @@ _return:
} }
if (taosArrayGetSize(pExpiredSch) > 0) { if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch); qwClearExpiredSch(mgmt, pExpiredSch);
} }
taosMemoryFreeClear(rspList); taosMemoryFreeClear(rspList);

View File

@ -373,9 +373,6 @@ static FORCE_INLINE void ncharToVar(char* buf, SScalarParam* pOut, int32_t rowIn
//TODO opt performance, tmp is not needed. //TODO opt performance, tmp is not needed.
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
int32_t bufSize = pIn->columnData->info.bytes;
char *tmp = taosMemoryMalloc(bufSize + VARSTR_HEADER_SIZE);
bool vton = false; bool vton = false;
_bufConverteFunc func = NULL; _bufConverteFunc func = NULL;
@ -423,6 +420,12 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
continue; continue;
} }
} }
int32_t bufSize = pIn->columnData->info.bytes;
char *tmp = taosMemoryMalloc(varDataTLen(data));
if(!tmp){
sclError("out of memory in vectorConvertFromVarData");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (vton) { if (vton) {
memcpy(tmp, data, varDataTLen(data)); memcpy(tmp, data, varDataTLen(data));
} else { } else {
@ -444,9 +447,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
} }
(*func)(tmp, pOut, i); (*func)(tmp, pOut, i);
taosMemoryFreeClear(tmp);
} }
taosMemoryFreeClear(tmp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1674,6 +1677,14 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut
void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
vectorConvertImpl(pLeft, pOut); vectorConvertImpl(pLeft, pOut);
for(int32_t i = 0; i < pOut->numOfRows; ++i) {
if(colDataIsNull_s(pOut->columnData, i)) {
int8_t v = 0;
colDataAppendInt8(pOut->columnData, i, &v);
colDataSetNotNull_f(pOut->columnData->nullbitmap, i);
}
}
pOut->columnData->hasNull = false;
} }
STagVal getJsonValue(char *json, char *key, bool *isExist) { STagVal getJsonValue(char *json, char *key, bool *isExist) {

File diff suppressed because it is too large Load Diff

View File

@ -207,6 +207,7 @@ typedef struct SSchJob {
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*

View File

@ -64,6 +64,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob->nodeList = taosArrayDup(pNodeList); pJob->nodeList = taosArrayDup(pNodeList);
} }
pJob->taskList =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE(pJob, plan->subplanType); SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0}; SSchTask task = {0};
SSchTask *pTask = &task;
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task); SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) { if (NULL == pTask) {
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p)); SCH_ERR_JRET(schRecordQueryDataSrc(pJob, pTask));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
++pJob->taskNum; ++pJob->taskNum;
} }
@ -1276,14 +1286,10 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
} }
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
schGetTaskFromList(pJob->execTasks, taskId, pTask); schGetTaskFromList(pJob->taskList, taskId, pTask);
if (NULL == *pTask) { if (NULL == *pTask) {
schGetTaskFromList(pJob->succTasks, taskId, pTask); SCH_JOB_ELOG("task not found in job task list, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
if (NULL == *pTask) {
SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1445,6 +1451,7 @@ void schFreeJobImpl(void *job) {
taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks); taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList); taosArrayDestroy(pJob->nodeList);

View File

@ -24,10 +24,9 @@ extern "C" {
#endif #endif
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -26,7 +26,7 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
pRunReq->streamId = pTask->streamId; pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId; pRunReq->taskId = pTask->taskId;
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_TASK_RUN, .msgType = TDMT_STREAM_TASK_RUN,
.pCont = pRunReq, .pCont = pRunReq,
.contLen = sizeof(SStreamTaskRunReq), .contLen = sizeof(SStreamTaskRunReq),
}; };
@ -83,7 +83,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
// 3. handle output // 3. handle output
// 3.1 check and set status // 3.1 check and set status
// 3.2 dispatch / sink // 3.2 dispatch / sink
streamSink1(pTask, pMsgCb); if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pMsgCb);
}
return 0; return 0;
} }
@ -97,13 +99,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
return 0; return 0;
} }
// continue dispatch // continue dispatch
streamSink1(pTask, pMsgCb); if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pMsgCb);
}
return 0; return 0;
} }
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
streamExec(pTask, pMsgCb); streamExec(pTask, pMsgCb);
streamSink1(pTask, pMsgCb); if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pMsgCb);
}
return 0; return 0;
} }

View File

@ -86,22 +86,3 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone; return pSubmitClone;
} }
#if 0
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->streamId);
tlen += taosEncodeFixedI32(buf, pReq->taskId);
tlen += tEncodeDataBlocks(buf, pReq->data);
return tlen;
}
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->streamId);
buf = taosDecodeFixedI32(buf, &pReq->taskId);
buf = tDecodeDataBlocks(buf, &pReq->data);
return (void*)buf;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
#endif

View File

@ -174,131 +174,26 @@ FAIL:
return code; return code;
} }
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
#if 0 #if 1
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
return 0; return 0;
} }
#endif #endif
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
int32_t qType; SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { if (pBlock == NULL) return 0;
qType = FETCH_QUEUE; ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
qType = WRITE_QUEUE;
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg); SRpcMsg dispatchMsg = {0};
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { SEpSet* pEpSet = NULL;
SRpcMsg dispatchMsg = {0}; if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
SEpSet* pEpSet = NULL; ASSERT(0);
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
}
return 0;
}
#if 0
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
.data = data,
};
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1; return -1;
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { tmsgSendReq(pEpSet, &dispatchMsg);
((SMsgHead*)buf)->vgId = 0;
req.taskId = pTask->inplaceDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
req.taskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO use general name rule of schemaless
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
// all groupId must be the same in an array
SSDataBlock* pBlock = taosArrayGet(data, 0);
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
// get node
// TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(vgInfo);
int32_t nodeId = 0;
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
nodeId = pVgInfo->vgId;
req.taskId = pVgInfo->taskId;
*ppEpSet = &pVgInfo->epSet;
break;
}
}
ASSERT(nodeId != 0);
((SMsgHead*)buf)->vgId = htonl(nodeId);
}
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSStreamTaskExecReq(&abuf, &req);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
pMsg->msgType = pTask->dispatchMsgType;
pMsg->info.noResp = 1;
return 0; return 0;
} }
static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(data, pIter);
if (pIter == NULL) return 0;
SArray* pData = *(SArray**)pIter;
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet;
if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
}
return 0;
}
#endif

View File

@ -41,3 +41,12 @@ void streamQueueClose(SStreamQueue* queue) {
return; return;
} }
} }
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}

View File

@ -1,159 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
SStreamQueue* queue;
if (pTask->execType == TASK_EXEC__NONE) {
queue = pTask->inputQueue;
} else {
queue = pTask->outputQueue;
}
/*if (streamDequeueBegin(queue) == true) {*/
/*return -1;*/
/*}*/
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA ||
pTask->dispatchType != TASK_DISPATCH__NONE) {
while (1) {
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
if (pBlock == NULL) break;
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
// local sink
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
}
// TODO: sink and dispatch should be only one
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(queue == pTask->outputQueue);
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pMsgCb, pBlock);
}
streamQueueProcessSuccess(queue);
}
}
return 0;
}
#if 0
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
bool firstRun = 1;
while (1) {
SStreamDataBlock* pBlock = NULL;
if (!firstRun) {
taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
}
taosGetQitem(pTask->outputQAll, (void**)&pBlock);
if (pBlock == NULL) {
if (firstRun) {
firstRun = 0;
continue;
} else {
break;
}
}
SArray* pRes = pBlock->blocks;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
// blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
// dispatch
// TODO dispatch guard
int8_t outputStatus = atomic_load_8(&pTask->outputStatus);
if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) {
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
qType = FETCH_QUEUE;
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/
/*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/
/*qType = MERGE_QUEUE;*/
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/
/*qType = WRITE_QUEUE;*/
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pShuffleRes == NULL) {
return -1;
}
int32_t sz = taosArrayGetSize(pRes);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
if (pArray == NULL) {
pArray = taosArrayInit(0, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
}
taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
}
taosArrayPush(pArray, pDataBlock);
}
if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
return -1;
}
} else {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
}
}
}
return 0;
}
#endif

View File

@ -131,7 +131,7 @@ if __name__ == "__main__":
is_test_framework = 0 is_test_framework = 0
key_word = 'tdCases.addWindows' key_word = 'tdCases.addWindows'
try: try:
if key_word in open(fileName).read(): if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1 is_test_framework = 1
except: except:
pass pass

View File

@ -416,7 +416,7 @@ class TDDnode:
psCmd, shell=True).decode("utf-8") psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows': if not platform.system().lower() == 'windows':
for port in range(6030, 6041): for port in range(6030, 6041):
fuserCmd = "fuser -k -n tcp %d" % port fuserCmd = "fuser -k -n tcp %d > /dev/null" % port
os.system(fuserCmd) os.system(fuserCmd)
if self.valgrind: if self.valgrind:
time.sleep(2) time.sleep(2)

View File

@ -143,7 +143,7 @@ class TDTestCase:
tdSql.checkRows(9) tdSql.checkRows(9)
tdSql.query("select * from t1 where c5 between 136 and 127") tdSql.query("select * from t1 where c5 between 136 and 127")
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.query("select * from t1 where c5 between '~' and 'ˆ'") tdSql.query("select * from t1 where c5 between '~' and '^'")
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.query("select * from t1 where c5 not between 1 and 2") tdSql.query("select * from t1 where c5 not between 1 and 2")
# tdSql.checkRows(0) # tdSql.checkRows(0)

View File

@ -565,7 +565,7 @@ class TDTestCase:
if data_ct4_c10[i] is None: if data_ct4_c10[i] is None:
tdSql.checkData( i, 0, None ) tdSql.checkData( i, 0, None )
else: else:
time2str = str(int(datetime.datetime.timestamp(data_ct4_c10[i])*1000)) time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str ) tdSql.checkData( i, 0, time2str )
tdSql.query("select cast(c10 as nchar(32)) as b from t1") tdSql.query("select cast(c10 as nchar(32)) as b from t1")
for i in range(len(data_t1_c10)): for i in range(len(data_t1_c10)):
@ -574,7 +574,7 @@ class TDTestCase:
elif i == 10: elif i == 10:
continue continue
else: else:
time2str = str(int(datetime.datetime.timestamp(data_t1_c10[i])*1000)) time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str ) tdSql.checkData( i, 0, time2str )
tdLog.printNoPrefix("==========step38: cast timestamp to binary, expect no changes ") tdLog.printNoPrefix("==========step38: cast timestamp to binary, expect no changes ")
@ -583,7 +583,7 @@ class TDTestCase:
if data_ct4_c10[i] is None: if data_ct4_c10[i] is None:
tdSql.checkData( i, 0, None ) tdSql.checkData( i, 0, None )
else: else:
time2str = str(int(datetime.datetime.timestamp(data_ct4_c10[i])*1000)) time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str ) tdSql.checkData( i, 0, time2str )
tdSql.query("select cast(c10 as binary(32)) as b from t1") tdSql.query("select cast(c10 as binary(32)) as b from t1")
for i in range(len(data_t1_c10)): for i in range(len(data_t1_c10)):
@ -592,7 +592,7 @@ class TDTestCase:
elif i == 10: elif i == 10:
continue continue
else: else:
time2str = str(int(datetime.datetime.timestamp(data_t1_c10[i])*1000)) time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000))
tdSql.checkData( i, 0, time2str ) tdSql.checkData( i, 0, time2str )
tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ") tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ")

View File

@ -109,9 +109,6 @@ class TDTestCase:
# print("============== STEP 3 ===== query table") # print("============== STEP 3 ===== query table")
# # test error syntax # # test error syntax
tdSql.error("select * from jsons1 where jtag->tag1='beijing'") tdSql.error("select * from jsons1 where jtag->tag1='beijing'")
#tdSql.error("select * from jsons1 where jtag->'location'")
#tdSql.error("select * from jsons1 where jtag->''")
#tdSql.error("select * from jsons1 where jtag->''=9")
tdSql.error("select -> from jsons1") tdSql.error("select -> from jsons1")
tdSql.error("select * from jsons1 where contains") tdSql.error("select * from jsons1 where contains")
tdSql.error("select * from jsons1 where jtag->") tdSql.error("select * from jsons1 where jtag->")
@ -341,6 +338,13 @@ class TDTestCase:
# tdSql.checkRows(3) # tdSql.checkRows(3)
# tdSql.query("select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'") # tdSql.query("select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'")
# tdSql.checkRows(2) # tdSql.checkRows(2)
# test is true
tdSql.query("select * from jsons1 where jtag->'location'")
tdSql.checkRows(0)
tdSql.query("select * from jsons1 where jtag->'tag1'")
tdSql.checkRows(3)
# #
# # test with tbname/normal column # # test with tbname/normal column
tdSql.query("select * from jsons1 where tbname = 'jsons1_1'") tdSql.query("select * from jsons1 where tbname = 'jsons1_1'")

View File

@ -13,6 +13,9 @@ echo Linux Taosd Test
for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
echo Processing %%i echo Processing %%i
call :GetTimeSeconds %time%
set time1=!_timeTemp!
echo Start at %time%
set /a a+=1 set /a a+=1
call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@ -21,7 +24,33 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
exit exit
:colorEcho :colorEcho
call :GetTimeSeconds %time%
set time2=%_timeTemp%
set /a interTime=%time2% - %time1%
echo End at %time% , cast %interTime%s
echo off echo off
<nul set /p ".=%DEL%" > "%~2" <nul set /p ".=%DEL%" > "%~2"
findstr /v /a:%1 /R "^$" "%~2" nul findstr /v /a:%1 /R "^$" "%~2" nul
del "%~2" > nul 2>&1i del "%~2" > nul 2>&1i
goto :eof
:GetTimeSeconds
set tt=%1
set tt=%tt:.= %
set tt=%tt::= %
set index=1
for %%a in (%tt%) do (
if !index! EQU 1 (
set hh=%%a
)^
else if !index! EQU 2 (
set mm=%%a
)^
else if !index! EQU 3 (
set ss=%%a
)
set /a index=index+1
)
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof

View File

@ -168,7 +168,7 @@ if __name__ == "__main__":
key_word = 'tdCases.addWindows' key_word = 'tdCases.addWindows'
is_test_framework = 0 is_test_framework = 0
try: try:
if key_word in open(fileName).read(): if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1 is_test_framework = 1
except: except:
pass pass