Merge pull request #13387 from taosdata/feature/tq
fix(tmq): tq deserialize msg
This commit is contained in:
commit
fb93e68fb2
|
@ -2570,6 +2570,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
|||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
||||
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
||||
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
|
||||
if (pRsp->withTbName) {
|
||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
}
|
||||
if (pRsp->withSchema) {
|
||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
int32_t bLen = 0;
|
||||
|
@ -2579,20 +2585,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
|||
taosArrayPush(pRsp->blockDataLen, &bLen);
|
||||
taosArrayPush(pRsp->blockData, &data);
|
||||
if (pRsp->withSchema) {
|
||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||
} else {
|
||||
pRsp->blockSchema = NULL;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
char* name = NULL;
|
||||
buf = taosDecodeString(buf, &name);
|
||||
taosArrayPush(pRsp->blockTbName, &name);
|
||||
} else {
|
||||
pRsp->blockTbName = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,16 +31,16 @@ extern int32_t taosTmrThreads;
|
|||
|
||||
void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label);
|
||||
|
||||
void taosTmrCleanUp(void *handle);
|
||||
|
||||
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle);
|
||||
|
||||
bool taosTmrStop(tmr_h tmrId);
|
||||
|
||||
bool taosTmrStopA(tmr_h *timerId);
|
||||
bool taosTmrStopA(tmr_h *tmrId);
|
||||
|
||||
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
|
||||
|
||||
void taosTmrCleanUp(void *handle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -81,6 +81,8 @@ typedef struct {
|
|||
// rpc info
|
||||
int64_t reqId;
|
||||
SRpcHandleInfo rpcInfo;
|
||||
tmr_h timerId;
|
||||
int8_t tmrStopped;
|
||||
// exec
|
||||
int8_t inputStatus;
|
||||
int8_t execStatus;
|
||||
|
@ -164,13 +166,12 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
|||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
// tqOffset
|
||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
||||
void STqOffsetClose(STqOffsetStore*);
|
||||
|
||||
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
||||
STqOffsetStore* tqOffsetOpen(STqOffsetCfg*);
|
||||
void tqOffsetClose(STqOffsetStore*);
|
||||
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -174,6 +174,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
||||
if (rsp.withSchema) {
|
||||
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
|
||||
}
|
||||
|
||||
rsp.rspOffset = fetchOffset;
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ struct STqOffsetStore {
|
|||
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
||||
};
|
||||
|
||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
|
||||
STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) {
|
||||
STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore));
|
||||
if (pStore == NULL) {
|
||||
return NULL;
|
||||
|
|
|
@ -15,6 +15,11 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
void tqTmrRspFunc(void* param, void* tmrId) {
|
||||
STqHandle* pHandle = (STqHandle*)param;
|
||||
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
|
||||
}
|
||||
|
||||
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
||||
// 1. guard and set status executing
|
||||
// 2. check processedVer
|
||||
|
@ -50,12 +55,15 @@ int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer) {
|
||||
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
|
||||
int64_t timeout) {
|
||||
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
|
||||
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
|
||||
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
|
||||
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
|
||||
// set timeout timer
|
||||
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
|
||||
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
||||
|
|
|
@ -69,6 +69,9 @@ int vnodeInit(int nthreads) {
|
|||
if (walInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tqInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -94,6 +97,9 @@ void vnodeCleanup() {
|
|||
taosMemoryFreeClear(vnodeGlobal.threads);
|
||||
taosThreadCondDestroy(&(vnodeGlobal.hasTask));
|
||||
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
|
||||
|
||||
walCleanUp();
|
||||
tqCleanUp();
|
||||
}
|
||||
|
||||
int vnodeScheduleTask(int (*execute)(void*), void* arg) {
|
||||
|
@ -155,4 +161,4 @@ static void* loop(void* arg) {
|
|||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
#include "qworker.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qwInt.h"
|
||||
#include "qwMsg.h"
|
||||
#include "qworker.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
@ -406,7 +406,6 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||
int32_t paramIdx = 0;
|
||||
int32_t newParamIdx = 0;
|
||||
|
@ -430,11 +429,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
|||
*pParam = &gQwMgmt.param[paramIdx];
|
||||
}
|
||||
|
||||
|
||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
|
||||
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
|
||||
|
||||
if (dbFName[0] && tbName[0]) {
|
||||
|
@ -444,7 +442,6 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void qwCloseRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||
|
@ -454,13 +451,13 @@ void qwCloseRef(void) {
|
|||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
}
|
||||
|
||||
|
||||
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
|
||||
|
||||
void qwDestroyImpl(void *pMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
|
||||
taosTmrStopA(&mgmt->hbTimer);
|
||||
taosTmrStop(mgmt->hbTimer);
|
||||
mgmt->hbTimer = NULL;
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
// TODO STOP ALL QUERY
|
||||
|
@ -527,10 +524,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
|||
switch (type) {
|
||||
case QUERY_QUEUE:
|
||||
pStat = &mgmt->stat.msgStat.waitTime[0];
|
||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
||||
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||
case FETCH_QUEUE:
|
||||
pStat = &mgmt->stat.msgStat.waitTime[1];
|
||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
||||
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||
default:
|
||||
qError("unsupported queue type %d", type);
|
||||
}
|
||||
|
@ -538,5 +535,3 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -23,19 +23,19 @@
|
|||
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
|
||||
|
||||
typedef struct {
|
||||
char label[TSDB_LABEL_LEN];
|
||||
tsem_t emptySem;
|
||||
tsem_t fullSem;
|
||||
char label[TSDB_LABEL_LEN];
|
||||
tsem_t emptySem;
|
||||
tsem_t fullSem;
|
||||
TdThreadMutex queueMutex;
|
||||
int32_t fullSlot;
|
||||
int32_t emptySlot;
|
||||
int32_t queueSize;
|
||||
int32_t numOfThreads;
|
||||
TdThread *qthread;
|
||||
SSchedMsg *queue;
|
||||
bool stop;
|
||||
void *pTmrCtrl;
|
||||
void *pTimer;
|
||||
int32_t fullSlot;
|
||||
int32_t emptySlot;
|
||||
int32_t queueSize;
|
||||
int32_t numOfThreads;
|
||||
TdThread *qthread;
|
||||
SSchedMsg *queue;
|
||||
bool stop;
|
||||
void *pTmrCtrl;
|
||||
void *pTimer;
|
||||
} SSchedQueue;
|
||||
|
||||
static void *taosProcessSchedQueue(void *param);
|
||||
|
@ -218,7 +218,8 @@ void taosCleanUpScheduler(void *param) {
|
|||
taosThreadMutexDestroy(&pSched->queueMutex);
|
||||
|
||||
if (pSched->pTimer) {
|
||||
taosTmrStopA(&pSched->pTimer);
|
||||
taosTmrStop(pSched->pTimer);
|
||||
pSched->pTimer = NULL;
|
||||
}
|
||||
|
||||
if (pSched->queue) taosMemoryFree(pSched->queue);
|
||||
|
|
Loading…
Reference in New Issue