Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
85e40668d3
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -82,7 +82,7 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(
|
pRes = taos_query(
|
||||||
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)");
|
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -933,6 +933,7 @@ typedef struct {
|
||||||
int64_t numOfProcessedFetch;
|
int64_t numOfProcessedFetch;
|
||||||
int64_t numOfProcessedDrop;
|
int64_t numOfProcessedDrop;
|
||||||
int64_t numOfProcessedHb;
|
int64_t numOfProcessedHb;
|
||||||
|
int64_t numOfProcessedDelete;
|
||||||
int64_t cacheDataSize;
|
int64_t cacheDataSize;
|
||||||
int64_t numOfQueryInQueue;
|
int64_t numOfQueryInQueue;
|
||||||
int64_t numOfFetchInQueue;
|
int64_t numOfFetchInQueue;
|
||||||
|
@ -2689,20 +2690,20 @@ int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
|
||||||
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
|
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t delUid;
|
SMsgHead header;
|
||||||
int64_t tbUid; // super/child/normal table
|
uint64_t sId;
|
||||||
int8_t type; // table type
|
uint64_t queryId;
|
||||||
int16_t nWnds;
|
uint64_t taskId;
|
||||||
char* tbFullName;
|
uint32_t sqlLen;
|
||||||
char* subPlan;
|
uint32_t phyLen;
|
||||||
STimeWindow wnds[];
|
char* sql;
|
||||||
|
char* msg;
|
||||||
} SVDeleteReq;
|
} SVDeleteReq;
|
||||||
|
|
||||||
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq);
|
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
|
||||||
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq);
|
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
|
||||||
int64_t affectedRows;
|
int64_t affectedRows;
|
||||||
} SVDeleteRsp;
|
} SVDeleteRsp;
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
|
|
|
@ -32,6 +32,18 @@ extern "C" {
|
||||||
struct SDataSink;
|
struct SDataSink;
|
||||||
struct SSDataBlock;
|
struct SSDataBlock;
|
||||||
|
|
||||||
|
typedef struct SDeleterRes {
|
||||||
|
uint64_t uid;
|
||||||
|
SArray* uidList;
|
||||||
|
int64_t skey;
|
||||||
|
int64_t ekey;
|
||||||
|
int64_t affectedRows;
|
||||||
|
} SDeleterRes;
|
||||||
|
|
||||||
|
typedef struct SDeleterParam {
|
||||||
|
SArray* pUidList;
|
||||||
|
} SDeleterParam;
|
||||||
|
|
||||||
typedef struct SDataSinkStat {
|
typedef struct SDataSinkStat {
|
||||||
uint64_t cachedSize;
|
uint64_t cachedSize;
|
||||||
} SDataSinkStat;
|
} SDataSinkStat;
|
||||||
|
@ -64,7 +76,7 @@ typedef struct SOutputData {
|
||||||
* @param pHandle output
|
* @param pHandle output
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
|
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam);
|
||||||
|
|
||||||
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat);
|
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat);
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,12 @@ enum {
|
||||||
NODE_TYPE_MNODE,
|
NODE_TYPE_MNODE,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct SDeleteRes {
|
||||||
|
uint64_t uid;
|
||||||
|
SArray* uidList;
|
||||||
|
int64_t skey;
|
||||||
|
int64_t ekey;
|
||||||
|
} SDeleteRes;
|
||||||
|
|
||||||
typedef struct SQWorkerCfg {
|
typedef struct SQWorkerCfg {
|
||||||
uint32_t maxSchedulerNum;
|
uint32_t maxSchedulerNum;
|
||||||
|
@ -47,6 +52,7 @@ typedef struct {
|
||||||
uint64_t fetchProcessed;
|
uint64_t fetchProcessed;
|
||||||
uint64_t dropProcessed;
|
uint64_t dropProcessed;
|
||||||
uint64_t hbProcessed;
|
uint64_t hbProcessed;
|
||||||
|
uint64_t deleteProcessed;
|
||||||
|
|
||||||
uint64_t numOfQueryInQueue;
|
uint64_t numOfQueryInQueue;
|
||||||
uint64_t numOfFetchInQueue;
|
uint64_t numOfFetchInQueue;
|
||||||
|
@ -74,6 +80,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
||||||
|
|
||||||
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
||||||
|
|
||||||
|
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes);
|
||||||
|
|
||||||
void qWorkerDestroy(void **qWorkerMgmt);
|
void qWorkerDestroy(void **qWorkerMgmt);
|
||||||
|
|
||||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
||||||
|
|
|
@ -285,12 +285,6 @@ struct SStreamTask {
|
||||||
|
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
int8_t outputStatus;
|
int8_t outputStatus;
|
||||||
#if 0
|
|
||||||
STaosQueue* inputQ;
|
|
||||||
STaosQall* inputQAll;
|
|
||||||
STaosQueue* outputQ;
|
|
||||||
STaosQall* outputQAll;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SStreamQueue* inputQueue;
|
SStreamQueue* inputQueue;
|
||||||
SStreamQueue* outputQueue;
|
SStreamQueue* outputQueue;
|
||||||
|
@ -371,13 +365,6 @@ typedef struct {
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SStreamTaskRunReq;
|
} SStreamTaskRunReq;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
// SMsgHead head;
|
|
||||||
int64_t streamId;
|
|
||||||
int64_t version;
|
|
||||||
SArray* res; // SArray<SSDataBlock>
|
|
||||||
} SStreamSinkReq;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
@ -411,11 +398,9 @@ typedef struct {
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
} SStreamTaskRecoverRsp;
|
} SStreamTaskRecoverRsp;
|
||||||
|
|
||||||
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
||||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
|
||||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
|
||||||
|
|
||||||
int32_t streamTaskRun(SStreamTask* pTask);
|
int32_t streamTaskRun(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -457,7 +457,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
return pRequest->code;
|
return pRequest->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||||
|
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
|
|
|
@ -932,6 +932,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDelete) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1;
|
||||||
|
@ -1001,6 +1002,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDelete) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1;
|
||||||
|
@ -3823,46 +3825,70 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) {
|
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
|
if (buf != NULL) {
|
||||||
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
|
buf = (char *)buf + headLen;
|
||||||
if (tEncodeI64(pCoder, pReq->tbUid) < 0) return -1;
|
bufLen -= headLen;
|
||||||
if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
|
|
||||||
if (tEncodeI16v(pCoder, pReq->nWnds) < 0) return -1;
|
|
||||||
if (tEncodeCStr(pCoder, pReq->tbFullName) < 0) return -1;
|
|
||||||
if (tEncodeCStr(pCoder, pReq->subPlan) < 0) return -1;
|
|
||||||
for (int16_t i = 0; i < pReq->nWnds; ++i) {
|
|
||||||
if (tEncodeI64(pCoder, pReq->wnds[i].skey) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->wnds[i].ekey) < 0) return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
SEncoder encoder = {0};
|
||||||
return 0;
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
|
||||||
|
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
|
||||||
|
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||||
|
pHead->vgId = htonl(pReq->header.vgId);
|
||||||
|
pHead->contLen = htonl(tlen + headLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen + headLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) {
|
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
|
SMsgHead *pHead = buf;
|
||||||
if (tDecodeI64(pCoder, &pReq->tbUid) < 0) return -1;
|
pHead->vgId = pReq->header.vgId;
|
||||||
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1;
|
pHead->contLen = pReq->header.contLen;
|
||||||
if (tDecodeI16v(pCoder, &pReq->nWnds) < 0) return -1;
|
|
||||||
if (tDecodeCStr(pCoder, &pReq->tbFullName) < 0) return -1;
|
|
||||||
if (tDecodeCStr(pCoder, &pReq->subPlan) < 0) return -1;
|
|
||||||
for (int16_t i = 0; i < pReq->nWnds; ++i) {
|
|
||||||
if (tDecodeI64(pCoder, &pReq->wnds[i].skey) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->wnds[i].ekey) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
|
||||||
|
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
|
||||||
|
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1;
|
||||||
|
pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1);
|
||||||
|
if (NULL == pReq->sql) return -1;
|
||||||
|
pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1);
|
||||||
|
if (NULL == pReq->msg) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
|
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->affectedRows) < 0) return -1;
|
if (tEncodeI64(pCoder, pReq->affectedRows) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
|
@ -3872,7 +3898,6 @@ int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
|
||||||
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
|
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->affectedRows) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->affectedRows) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
|
|
|
@ -358,6 +358,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -59,6 +59,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
||||||
pLoad->numOfProcessedFetch = stat.fetchProcessed;
|
pLoad->numOfProcessedFetch = stat.fetchProcessed;
|
||||||
pLoad->numOfProcessedDrop = stat.dropProcessed;
|
pLoad->numOfProcessedDrop = stat.dropProcessed;
|
||||||
pLoad->numOfProcessedHb = stat.hbProcessed;
|
pLoad->numOfProcessedHb = stat.hbProcessed;
|
||||||
|
pLoad->numOfProcessedDelete = stat.deleteProcessed;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,8 +216,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO wrap in destroy func
|
// TODO wrap in destroy func
|
||||||
taosArrayDestroy(rsp.blockData);
|
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
taosArrayDestroy(rsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
|
||||||
|
|
||||||
if (rsp.withSchema) {
|
if (rsp.withSchema) {
|
||||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchReq* pReq = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
SStreamDispatchReq req;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
int32_t taskId = req.taskId;
|
||||||
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
||||||
|
|
||||||
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -142,6 +143,9 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_DELETE:
|
||||||
|
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err;
|
||||||
|
break;
|
||||||
/* TQ */
|
/* TQ */
|
||||||
case TDMT_VND_MQ_VG_CHANGE:
|
case TDMT_VND_MQ_VG_CHANGE:
|
||||||
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
|
@ -256,6 +260,22 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
vTrace("message in write queue is processing");
|
||||||
|
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
SDeleteRes res = {0};
|
||||||
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
|
|
||||||
|
switch (pMsg->msgType) {
|
||||||
|
case TDMT_VND_DELETE:
|
||||||
|
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res);
|
||||||
|
default:
|
||||||
|
vError("unknown msg type:%d in write queue", pMsg->msgType);
|
||||||
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: remove the function
|
// TODO: remove the function
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -873,4 +893,4 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
|
||||||
pRsp->contLen = 0;
|
pRsp->contLen = 0;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ typedef struct SDataSinkHandle {
|
||||||
} SDataSinkHandle;
|
} SDataSinkHandle;
|
||||||
|
|
||||||
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
|
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
|
||||||
|
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -869,6 +869,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
|
||||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
EOPTR_EXEC_MODEL model);
|
EOPTR_EXEC_MODEL model);
|
||||||
|
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
||||||
int32_t* resNum);
|
int32_t* resNum);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,254 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "dataSinkInt.h"
|
||||||
|
#include "dataSinkMgt.h"
|
||||||
|
#include "executorimpl.h"
|
||||||
|
#include "planner.h"
|
||||||
|
#include "tcompression.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
|
||||||
|
extern SDataSinkStat gDataSinkStat;
|
||||||
|
|
||||||
|
typedef struct SDataDeleterBuf {
|
||||||
|
int32_t useSize;
|
||||||
|
int32_t allocSize;
|
||||||
|
char* pData;
|
||||||
|
} SDataDeleterBuf;
|
||||||
|
|
||||||
|
typedef struct SDataCacheEntry {
|
||||||
|
int32_t dataLen;
|
||||||
|
int32_t numOfRows;
|
||||||
|
int32_t numOfCols;
|
||||||
|
int8_t compressed;
|
||||||
|
char data[];
|
||||||
|
} SDataCacheEntry;
|
||||||
|
|
||||||
|
typedef struct SDataDeleterHandle {
|
||||||
|
SDataSinkHandle sink;
|
||||||
|
SDataSinkManager* pManager;
|
||||||
|
SDataBlockDescNode* pSchema;
|
||||||
|
SDataDeleterNode* pDeleter;
|
||||||
|
SDeleterParam* pParam;
|
||||||
|
STaosQueue* pDataBlocks;
|
||||||
|
SDataDeleterBuf nextOutput;
|
||||||
|
int32_t status;
|
||||||
|
bool queryEnd;
|
||||||
|
uint64_t useconds;
|
||||||
|
uint64_t cachedSize;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
} SDataDeleterHandle;
|
||||||
|
|
||||||
|
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
||||||
|
if (tsCompressColData < 0 || 0 == pData->info.rows) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
|
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
|
||||||
|
int32_t colSize = pColRes->info.bytes * pData->info.rows;
|
||||||
|
if (NEEDTO_COMPRESS_QUERY(colSize)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
|
||||||
|
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
|
||||||
|
|
||||||
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||||
|
pEntry->compressed = 0;
|
||||||
|
pEntry->numOfRows = pInput->pData->info.rows;
|
||||||
|
pEntry->numOfCols = pInput->pData->info.numOfCols;
|
||||||
|
pEntry->dataLen = sizeof(SDeleterRes);
|
||||||
|
|
||||||
|
ASSERT(1 == pEntry->numOfRows);
|
||||||
|
ASSERT(1 == pEntry->numOfCols);
|
||||||
|
|
||||||
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
|
|
||||||
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
||||||
|
|
||||||
|
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
|
||||||
|
pRes->uid = pHandle->pDeleter->tableId;
|
||||||
|
pRes->uidList = pHandle->pParam->pUidList;
|
||||||
|
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
|
||||||
|
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
|
||||||
|
pRes->affectedRows = *(int64_t*)pColRes->pData;
|
||||||
|
|
||||||
|
pBuf->useSize += pEntry->dataLen;
|
||||||
|
|
||||||
|
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||||
|
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
|
||||||
|
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
|
||||||
|
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
|
||||||
|
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
||||||
|
taosQueueItemSize(pDeleter->pDataBlocks));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
|
||||||
|
|
||||||
|
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
|
||||||
|
if (pBuf->pData == NULL) {
|
||||||
|
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL != pBuf->pData;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
|
||||||
|
taosThreadMutexLock(&pDeleter->mutex);
|
||||||
|
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
|
||||||
|
int32_t status =
|
||||||
|
(0 == blockNums ? DS_BUF_EMPTY
|
||||||
|
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||||
|
pDeleter->status = status;
|
||||||
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
|
||||||
|
taosThreadMutexLock(&pDeleter->mutex);
|
||||||
|
int32_t status = pDeleter->status;
|
||||||
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||||
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
|
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM);
|
||||||
|
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) {
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
toDataCacheEntry(pDeleter, pInput, pBuf);
|
||||||
|
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
|
||||||
|
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
||||||
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
|
taosThreadMutexLock(&pDeleter->mutex);
|
||||||
|
pDeleter->queryEnd = true;
|
||||||
|
pDeleter->useconds = useconds;
|
||||||
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
|
||||||
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
|
if (taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||||
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
|
*pLen = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataDeleterBuf* pBuf = NULL;
|
||||||
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
|
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
||||||
|
taosFreeQitem(pBuf);
|
||||||
|
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
|
||||||
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
|
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
|
if (NULL == pDeleter->nextOutput.pData) {
|
||||||
|
assert(pDeleter->queryEnd);
|
||||||
|
pOutput->useconds = pDeleter->useconds;
|
||||||
|
pOutput->precision = pDeleter->pSchema->precision;
|
||||||
|
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||||
|
pOutput->queryEnd = pDeleter->queryEnd;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
||||||
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||||
|
pOutput->numOfRows = pEntry->numOfRows;
|
||||||
|
pOutput->numOfCols = pEntry->numOfCols;
|
||||||
|
pOutput->compressed = pEntry->compressed;
|
||||||
|
|
||||||
|
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
|
||||||
|
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
|
||||||
|
pOutput->bufStatus = updateStatus(pDeleter);
|
||||||
|
taosThreadMutexLock(&pDeleter->mutex);
|
||||||
|
pOutput->queryEnd = pDeleter->queryEnd;
|
||||||
|
pOutput->useconds = pDeleter->useconds;
|
||||||
|
pOutput->precision = pDeleter->pSchema->precision;
|
||||||
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
|
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
|
||||||
|
taosMemoryFreeClear(pDeleter->nextOutput.pData);
|
||||||
|
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||||
|
SDataDeleterBuf* pBuf = NULL;
|
||||||
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
|
taosMemoryFreeClear(pBuf->pData);
|
||||||
|
taosFreeQitem(pBuf);
|
||||||
|
}
|
||||||
|
taosCloseQueue(pDeleter->pDataBlocks);
|
||||||
|
taosThreadMutexDestroy(&pDeleter->mutex);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
||||||
|
SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
|
||||||
|
|
||||||
|
*size = atomic_load_64(&pDispatcher->cachedSize);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
|
||||||
|
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
||||||
|
if (NULL == deleter) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink;
|
||||||
|
deleter->sink.fPut = putDataBlock;
|
||||||
|
deleter->sink.fEndPut = endPut;
|
||||||
|
deleter->sink.fGetLen = getDataLength;
|
||||||
|
deleter->sink.fGetData = getDataBlock;
|
||||||
|
deleter->sink.fDestroy = destroyDataSinker;
|
||||||
|
deleter->sink.fGetCacheSize = getCacheSize;
|
||||||
|
deleter->pManager = pManager;
|
||||||
|
deleter->pDeleter = pDeleterNode;
|
||||||
|
deleter->pSchema = pDataSink->pInputDataBlockDesc;
|
||||||
|
deleter->pParam = pParam;
|
||||||
|
deleter->status = DS_BUF_EMPTY;
|
||||||
|
deleter->queryEnd = false;
|
||||||
|
deleter->pDataBlocks = taosOpenQueue();
|
||||||
|
taosThreadMutexInit(&deleter->mutex, NULL);
|
||||||
|
if (NULL == deleter->pDataBlocks) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
*pHandle = deleter;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -83,7 +83,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
pEntry->numOfCols = pInput->pData->info.numOfCols;
|
pEntry->numOfCols = pInput->pData->info.numOfCols;
|
||||||
pEntry->dataLen = 0;
|
pEntry->dataLen = 0;
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
||||||
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
|
@ -100,7 +100,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
|
pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
|
||||||
|
|
||||||
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
|
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
|
||||||
if (pBuf->pData == NULL) {
|
if (pBuf->pData == NULL) {
|
||||||
|
@ -211,7 +211,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
|
|
||||||
*size = atomic_load_64(&pDispatcher->cachedSize);
|
*size = atomic_load_64(&pDispatcher->cachedSize);
|
||||||
|
|
|
@ -34,9 +34,12 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
|
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle, void* pParam) {
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) {
|
switch (nodeType(pDataSink)) {
|
||||||
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||||
|
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||||
|
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,8 +45,15 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle) {
|
if (handle) {
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
void* pSinkParam = NULL;
|
||||||
|
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -5099,6 +5099,37 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) {
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo) {
|
||||||
|
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
|
switch (pNode->type) {
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
|
||||||
|
SDeleterParam *pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
|
||||||
|
if (NULL == pDeleterParam) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
|
||||||
|
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
|
||||||
|
if (NULL == pDeleterParam->pUidList) {
|
||||||
|
taosMemoryFree(pDeleterParam);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
|
STableKeyInfo *pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
|
||||||
|
taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
*pParam = pDeleterParam;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
EOPTR_EXEC_MODEL model) {
|
EOPTR_EXEC_MODEL model) {
|
||||||
uint64_t queryId = pPlan->id.queryId;
|
uint64_t queryId = pPlan->id.queryId;
|
||||||
|
|
|
@ -569,6 +569,7 @@ int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
|
||||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pInfo->numOfProcessedDelete) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1;
|
if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1;
|
||||||
|
@ -591,6 +592,7 @@ int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
|
||||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDelete) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1;
|
if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1;
|
||||||
|
|
|
@ -5015,6 +5015,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
pQuery->haveResultSet = true;
|
pQuery->haveResultSet = true;
|
||||||
pQuery->msgType = TDMT_VND_QUERY;
|
pQuery->msgType = TDMT_VND_QUERY;
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_DELETE_STMT:
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
pQuery->msgType = TDMT_VND_DELETE;
|
||||||
|
break;
|
||||||
case QUERY_NODE_VNODE_MODIF_STMT:
|
case QUERY_NODE_VNODE_MODIF_STMT:
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
|
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
|
||||||
|
|
|
@ -1395,6 +1395,7 @@ static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
|
code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
|
||||||
}
|
}
|
||||||
|
pSubplan->msgType = TDMT_VND_DELETE;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
||||||
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
|
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
|
||||||
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
|
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
|
||||||
*pLen = insert->size;
|
*pLen = insert->size;
|
||||||
*pStr = insert->pData;
|
*pStr = insert->pData;
|
||||||
|
|
|
@ -160,6 +160,7 @@ typedef struct SQWMsgStat {
|
||||||
uint64_t cancelProcessed;
|
uint64_t cancelProcessed;
|
||||||
uint64_t dropProcessed;
|
uint64_t dropProcessed;
|
||||||
uint64_t hbProcessed;
|
uint64_t hbProcessed;
|
||||||
|
uint64_t deleteProcessed;
|
||||||
} SQWMsgStat;
|
} SQWMsgStat;
|
||||||
|
|
||||||
typedef struct SQWRTStat {
|
typedef struct SQWRTStat {
|
||||||
|
@ -357,6 +358,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
|
||||||
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
|
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
|
||||||
void qwClearExpiredSch(SArray* pExpiredSch);
|
void qwClearExpiredSch(SArray* pExpiredSch);
|
||||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
|
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
|
||||||
|
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
|
||||||
|
|
||||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
||||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||||
|
|
|
@ -30,6 +30,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
||||||
|
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes);
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
|
|
|
@ -300,13 +300,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->sId = msg->sId;
|
|
||||||
msg->queryId = msg->queryId;
|
|
||||||
msg->taskId = msg->taskId;
|
|
||||||
msg->refId = msg->refId;
|
|
||||||
msg->phyLen = msg->phyLen;
|
|
||||||
msg->sqlLen = msg->sqlLen;
|
|
||||||
|
|
||||||
uint64_t sId = msg->sId;
|
uint64_t sId = msg->sId;
|
||||||
uint64_t qId = msg->queryId;
|
uint64_t qId = msg->queryId;
|
||||||
uint64_t tId = msg->taskId;
|
uint64_t tId = msg->taskId;
|
||||||
|
@ -523,3 +516,37 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes) {
|
||||||
|
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) {
|
||||||
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
SVDeleteReq req = {0};
|
||||||
|
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
|
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
|
||||||
|
|
||||||
|
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
|
||||||
|
|
||||||
|
uint64_t sId = req.sId;
|
||||||
|
uint64_t qId = req.queryId;
|
||||||
|
uint64_t tId = req.taskId;
|
||||||
|
int64_t rId = 0;
|
||||||
|
|
||||||
|
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
|
||||||
|
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
|
||||||
|
taosMemoryFreeClear(req.sql);
|
||||||
|
|
||||||
|
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp, pRes));
|
||||||
|
|
||||||
|
QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
QW_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -290,8 +290,11 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
|
if (ctx->ctrlConnInfo.handle) {
|
||||||
|
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
|
||||||
|
}
|
||||||
|
|
||||||
ctx->ctrlConnInfo.handle = NULL;
|
ctx->ctrlConnInfo.handle = NULL;
|
||||||
ctx->ctrlConnInfo.refId = -1;
|
ctx->ctrlConnInfo.refId = -1;
|
||||||
|
|
||||||
|
@ -333,7 +336,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
qwFreeTask(QW_FPARAMS(), &octx);
|
qwFreeTaskCtx(QW_FPARAMS(), &octx);
|
||||||
|
|
||||||
QW_TASK_DLOG_E("task ctx dropped");
|
QW_TASK_DLOG_E("task ctx dropped");
|
||||||
|
|
||||||
|
|
|
@ -183,7 +183,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
SRetrieveTableRsp *rsp = NULL;
|
SRetrieveTableRsp *rsp = NULL;
|
||||||
bool queryEnd = false;
|
bool queryEnd = false;
|
||||||
|
@ -242,6 +242,53 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) {
|
||||||
|
int32_t len = 0;
|
||||||
|
SVDeleteRsp rsp = {0};
|
||||||
|
bool queryEnd = false;
|
||||||
|
int32_t code = 0;
|
||||||
|
SOutputData output = {0};
|
||||||
|
|
||||||
|
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
||||||
|
|
||||||
|
if (len <= 0 || len != sizeof(SDeleterRes)) {
|
||||||
|
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
|
||||||
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
output.pData = taosMemoryCalloc(1, len);
|
||||||
|
if (NULL == output.pData) {
|
||||||
|
QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = dsGetDataBlock(ctx->sinkHandle, &output);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
taosMemoryFree(output.pData);
|
||||||
|
QW_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDeleterRes* pDelRes = (SDeleterRes*)output.pData;
|
||||||
|
|
||||||
|
rsp.affectedRows = pDelRes->affectedRows;
|
||||||
|
pRes->uid = pDelRes->uid;
|
||||||
|
pRes->uidList = pDelRes->uidList;
|
||||||
|
pRes->skey = pDelRes->skey;
|
||||||
|
pRes->ekey = pDelRes->ekey;
|
||||||
|
|
||||||
|
SEncoder coder = {0};
|
||||||
|
tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, code);
|
||||||
|
void *msg = rpcMallocCont(len);
|
||||||
|
tEncoderInit(&coder, msg, len);
|
||||||
|
tEncodeSVDeleteRsp(&coder, &rsp);
|
||||||
|
tEncoderClear(&coder);
|
||||||
|
|
||||||
|
*rspMsg = msg;
|
||||||
|
*dataLen = len;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -547,7 +594,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||||
|
|
||||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||||
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
||||||
|
@ -620,7 +667,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||||
|
|
||||||
if (NULL == rsp) {
|
if (NULL == rsp) {
|
||||||
ctx->dataConnInfo = qwMsg->connInfo;
|
ctx->dataConnInfo = qwMsg->connInfo;
|
||||||
|
@ -875,6 +922,47 @@ _return:
|
||||||
qwRelease(refId);
|
qwRelease(refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SSubplan *plan = NULL;
|
||||||
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
|
DataSinkHandle sinkHandle = NULL;
|
||||||
|
SQWTaskCtx ctx = {0};
|
||||||
|
|
||||||
|
code = qStringToSubplan(qwMsg->msg, &plan);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.plan = plan;
|
||||||
|
|
||||||
|
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
||||||
|
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.taskHandle = pTaskInfo;
|
||||||
|
ctx.sinkHandle = sinkHandle;
|
||||||
|
|
||||||
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
|
||||||
|
|
||||||
|
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont, pRes));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
qwFreeTaskCtx(QW_FPARAMS(), &ctx);
|
||||||
|
|
||||||
|
QW_RET(TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
||||||
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
|
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
|
||||||
|
@ -1007,6 +1095,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
|
||||||
pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
|
pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
|
||||||
pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
|
pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
|
||||||
pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
|
pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
|
||||||
|
pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
|
||||||
|
|
||||||
pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
|
pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
|
||||||
pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
|
pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
|
||||||
|
|
|
@ -1476,7 +1476,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
|
||||||
SSchJob *pJob = NULL;
|
SSchJob *pJob = NULL;
|
||||||
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
|
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " jobId:0x%"PRIx64 " started", pDag->queryId, pJob->refId);
|
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pDag->queryId, pJob->refId);
|
||||||
*job = pJob->refId;
|
*job = pJob->refId;
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||||
|
|
|
@ -62,10 +62,11 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
case TDMT_VND_DROP_TABLE_RSP:
|
case TDMT_VND_DROP_TABLE_RSP:
|
||||||
case TDMT_VND_ALTER_TABLE_RSP:
|
case TDMT_VND_ALTER_TABLE_RSP:
|
||||||
case TDMT_VND_SUBMIT_RSP:
|
case TDMT_VND_SUBMIT_RSP:
|
||||||
|
case TDMT_VND_DELETE_RSP:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
|
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_INVALID_MSG);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastMsgType != reqMsgType) {
|
if (lastMsgType != reqMsgType) {
|
||||||
|
@ -227,6 +228,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case TDMT_VND_DELETE_RSP: {
|
||||||
|
SCH_ERR_JRET(rspCode);
|
||||||
|
|
||||||
|
if (msg) {
|
||||||
|
SDecoder coder = {0};
|
||||||
|
SVDeleteRsp rsp = {0};
|
||||||
|
tDecoderInit(&coder, msg, msgSize);
|
||||||
|
tDecodeSVDeleteRsp(&coder, &rsp);
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
|
||||||
|
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
|
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TDMT_VND_QUERY_RSP: {
|
case TDMT_VND_QUERY_RSP: {
|
||||||
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
||||||
|
|
||||||
|
@ -411,6 +431,10 @@ int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code)
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||||
|
return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
||||||
}
|
}
|
||||||
|
@ -501,6 +525,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||||
case TDMT_VND_QUERY:
|
case TDMT_VND_QUERY:
|
||||||
*fp = schHandleQueryCallback;
|
*fp = schHandleQueryCallback;
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_DELETE:
|
||||||
|
*fp = schHandleDeleteCallback;
|
||||||
|
break;
|
||||||
case TDMT_VND_EXPLAIN:
|
case TDMT_VND_EXPLAIN:
|
||||||
*fp = schHandleExplainCallback;
|
*fp = schHandleExplainCallback;
|
||||||
break;
|
break;
|
||||||
|
@ -982,6 +1009,26 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case TDMT_VND_DELETE: {
|
||||||
|
SVDeleteReq req = {0};
|
||||||
|
req.header.vgId = addr->nodeId;
|
||||||
|
req.sId = schMgmt.sId;
|
||||||
|
req.queryId = pJob->queryId;
|
||||||
|
req.taskId = pTask->taskId;
|
||||||
|
req.phyLen = pTask->msgLen;
|
||||||
|
req.sqlLen = strlen(pJob->sql);
|
||||||
|
req.sql = (char*)pJob->sql;
|
||||||
|
req.msg = pTask->msg;
|
||||||
|
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
|
||||||
|
msg = taosMemoryCalloc(1, msgSize);
|
||||||
|
if (NULL == msg) {
|
||||||
|
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSVDeleteReq(msg, msgSize, &req);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TDMT_VND_QUERY: {
|
case TDMT_VND_QUERY: {
|
||||||
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
|
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
|
||||||
|
|
||||||
|
|
|
@ -257,6 +257,8 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
|
||||||
|
|
||||||
taosHashCleanup(pCtx->args);
|
taosHashCleanup(pCtx->args);
|
||||||
|
|
||||||
(*pCtx->freeFunc)(pCtx->brokenVal.val);
|
if (pCtx->freeFunc) {
|
||||||
|
(*pCtx->freeFunc)(pCtx->brokenVal.val);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
|
|
||||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,12 +57,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
||||||
}
|
}
|
||||||
|
|
||||||
// rsp by input status
|
// rsp by input status
|
||||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
|
||||||
|
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
pCont->inputStatus = status;
|
pCont->inputStatus = status;
|
||||||
pCont->streamId = pReq->streamId;
|
pCont->streamId = pReq->streamId;
|
||||||
pCont->taskId = pReq->sourceTaskId;
|
pCont->taskId = pReq->sourceTaskId;
|
||||||
pRsp->pCont = pCont;
|
pRsp->pCont = buf;
|
||||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||||
|
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||||
|
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||||
|
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
// TODO: init recover timer
|
// TODO: init recover timer
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
// continue dispatch
|
// continue dispatch
|
||||||
streamSink1(pTask, pMsgCb);
|
streamSink1(pTask, pMsgCb);
|
||||||
|
|
|
@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
if (output == NULL) break;
|
if (output == NULL) break;
|
||||||
// TODO: do we need free memory?
|
// TODO: do we need free memory?
|
||||||
SSDataBlock* outputCopy = createOneDataBlock(output, true);
|
SSDataBlock* outputCopy = createOneDataBlock(output, true);
|
||||||
|
outputCopy->info.childId = pTask->childId;
|
||||||
taosArrayPush(pRes, outputCopy);
|
taosArrayPush(pRes, outputCopy);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tstream.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
@ -32,7 +32,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
||||||
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return 0;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
|
@ -60,14 +60,168 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
||||||
SStreamDispatchReq req = {
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
.streamId = pTask->streamId,
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
.data = data,
|
if (buf == NULL) return -1;
|
||||||
};
|
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||||
|
pRetrieve->useconds = 0;
|
||||||
|
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
||||||
|
pRetrieve->compressed = 0;
|
||||||
|
pRetrieve->completed = 1;
|
||||||
|
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||||
|
|
||||||
|
int32_t actualLen = 0;
|
||||||
|
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
|
||||||
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
|
ASSERT(actualLen <= dataStrLen);
|
||||||
|
taosArrayPush(pReq->dataLen, &actualLen);
|
||||||
|
taosArrayPush(pReq->data, &buf);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
int32_t blockNum = taosArrayGetSize(data->blocks);
|
||||||
|
ASSERT(blockNum != 0);
|
||||||
|
|
||||||
|
SStreamDispatchReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.sourceTaskId = pTask->taskId,
|
||||||
|
.sourceVg = data->sourceVg,
|
||||||
|
.sourceChildId = pTask->childId,
|
||||||
|
.blockNum = blockNum,
|
||||||
|
};
|
||||||
|
|
||||||
|
req.data = taosArrayInit(blockNum, sizeof(void*));
|
||||||
|
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
|
||||||
|
if (req.data == NULL || req.dataLen == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
|
||||||
|
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t vgId = 0;
|
||||||
|
int32_t downstreamTaskId = 0;
|
||||||
|
// find ep
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
vgId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
// TODO get ctbName
|
||||||
|
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
|
||||||
|
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
|
||||||
|
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
|
||||||
|
// get vg and ep
|
||||||
|
// TODO: get hash function by hashMethod
|
||||||
|
|
||||||
|
// get groupId, compute hash value
|
||||||
|
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
|
||||||
|
|
||||||
|
// get node
|
||||||
|
// TODO: optimize search process
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t sz = taosArrayGetSize(vgInfo);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||||
|
vgId = pVgInfo->vgId;
|
||||||
|
downstreamTaskId = pVgInfo->taskId;
|
||||||
|
*ppEpSet = &pVgInfo->epSet;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(vgId != 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
req.taskId = downstreamTaskId;
|
||||||
|
|
||||||
|
// serialize
|
||||||
|
int32_t tlen;
|
||||||
|
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
|
||||||
|
if (code < 0) goto FAIL;
|
||||||
|
code = -1;
|
||||||
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
pMsg->contLen = tlen + sizeof(SMsgHead);
|
||||||
|
pMsg->pCont = buf;
|
||||||
|
pMsg->msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
FAIL:
|
||||||
|
if (code < 0 && buf) rpcFreeCont(buf);
|
||||||
|
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
|
||||||
|
if (req.dataLen) taosArrayDestroy(req.dataLen);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
|
||||||
|
#if 0
|
||||||
|
int8_t old =
|
||||||
|
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qType;
|
||||||
|
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
|
||||||
|
qType = FETCH_QUEUE;
|
||||||
|
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
|
||||||
|
qType = WRITE_QUEUE;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
SEpSet* pEpSet = NULL;
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
SEpSet* pEpSet = NULL;
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
SStreamTaskExecReq req = {
|
SStreamTaskExecReq req = {
|
||||||
.streamId = pTask->streamId,
|
.streamId = pTask->streamId,
|
||||||
|
@ -148,3 +302,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
|
@ -13,8 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "streamInc.h"
|
||||||
#include "tstream.h"
|
|
||||||
|
|
||||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
SStreamQueue* queue;
|
SStreamQueue* queue;
|
||||||
|
@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
} else {
|
} else {
|
||||||
queue = pTask->outputQueue;
|
queue = pTask->outputQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*if (streamDequeueBegin(queue) == true) {*/
|
/*if (streamDequeueBegin(queue) == true) {*/
|
||||||
/*return -1;*/
|
/*return -1;*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) {
|
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA ||
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
|
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
|
@ -36,17 +36,19 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
|
|
||||||
// local sink
|
// local sink
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
// TODO: sink and dispatch should be only one
|
||||||
ASSERT(queue == pTask->outputQueue);
|
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
|
||||||
ASSERT(queue == pTask->outputQueue);
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
|
||||||
ASSERT(queue == pTask->outputQueue);
|
ASSERT(queue == pTask->outputQueue);
|
||||||
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
|
|
||||||
|
streamDispatch(pTask, pMsgCb, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamQueueProcessSuccess(queue);
|
streamQueueProcessSuccess(queue);
|
||||||
|
|
Loading…
Reference in New Issue