Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
c64fb0ad81
|
@ -2570,6 +2570,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
|
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
|
}
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||||
int32_t bLen = 0;
|
int32_t bLen = 0;
|
||||||
|
@ -2579,20 +2585,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
taosArrayPush(pRsp->blockDataLen, &bLen);
|
taosArrayPush(pRsp->blockDataLen, &bLen);
|
||||||
taosArrayPush(pRsp->blockData, &data);
|
taosArrayPush(pRsp->blockData, &data);
|
||||||
if (pRsp->withSchema) {
|
if (pRsp->withSchema) {
|
||||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
||||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||||
} else {
|
|
||||||
pRsp->blockSchema = NULL;
|
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
|
||||||
char* name = NULL;
|
char* name = NULL;
|
||||||
buf = taosDecodeString(buf, &name);
|
buf = taosDecodeString(buf, &name);
|
||||||
taosArrayPush(pRsp->blockTbName, &name);
|
taosArrayPush(pRsp->blockTbName, &name);
|
||||||
} else {
|
|
||||||
pRsp->blockTbName = NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,16 +31,16 @@ extern int32_t taosTmrThreads;
|
||||||
|
|
||||||
void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label);
|
void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label);
|
||||||
|
|
||||||
|
void taosTmrCleanUp(void *handle);
|
||||||
|
|
||||||
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle);
|
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle);
|
||||||
|
|
||||||
bool taosTmrStop(tmr_h tmrId);
|
bool taosTmrStop(tmr_h tmrId);
|
||||||
|
|
||||||
bool taosTmrStopA(tmr_h *timerId);
|
bool taosTmrStopA(tmr_h *tmrId);
|
||||||
|
|
||||||
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
|
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
|
||||||
|
|
||||||
void taosTmrCleanUp(void *handle);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -81,6 +81,8 @@ typedef struct {
|
||||||
// rpc info
|
// rpc info
|
||||||
int64_t reqId;
|
int64_t reqId;
|
||||||
SRpcHandleInfo rpcInfo;
|
SRpcHandleInfo rpcInfo;
|
||||||
|
tmr_h timerId;
|
||||||
|
int8_t tmrStopped;
|
||||||
// exec
|
// exec
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
int8_t execStatus;
|
int8_t execStatus;
|
||||||
|
@ -164,9 +166,8 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
||||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||||
|
|
||||||
// tqOffset
|
// tqOffset
|
||||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
STqOffsetStore* tqOffsetOpen(STqOffsetCfg*);
|
||||||
void STqOffsetClose(STqOffsetStore*);
|
void tqOffsetClose(STqOffsetStore*);
|
||||||
|
|
||||||
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
||||||
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
||||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||||
|
|
|
@ -174,6 +174,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
||||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
||||||
|
if (rsp.withSchema) {
|
||||||
|
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
|
||||||
|
}
|
||||||
|
|
||||||
rsp.rspOffset = fetchOffset;
|
rsp.rspOffset = fetchOffset;
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ struct STqOffsetStore {
|
||||||
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
||||||
};
|
};
|
||||||
|
|
||||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
|
STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) {
|
||||||
STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore));
|
STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore));
|
||||||
if (pStore == NULL) {
|
if (pStore == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -15,6 +15,11 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
void tqTmrRspFunc(void* param, void* tmrId) {
|
||||||
|
STqHandle* pHandle = (STqHandle*)param;
|
||||||
|
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
||||||
// 1. guard and set status executing
|
// 1. guard and set status executing
|
||||||
// 2. check processedVer
|
// 2. check processedVer
|
||||||
|
@ -50,12 +55,15 @@ int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer) {
|
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
|
||||||
|
int64_t timeout) {
|
||||||
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
|
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
|
||||||
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
|
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
|
||||||
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
|
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
|
||||||
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
|
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
|
||||||
// set timeout timer
|
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
|
||||||
|
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
||||||
|
|
|
@ -69,6 +69,9 @@ int vnodeInit(int nthreads) {
|
||||||
if (walInit() < 0) {
|
if (walInit() < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (tqInit() < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -94,6 +97,9 @@ void vnodeCleanup() {
|
||||||
taosMemoryFreeClear(vnodeGlobal.threads);
|
taosMemoryFreeClear(vnodeGlobal.threads);
|
||||||
taosThreadCondDestroy(&(vnodeGlobal.hasTask));
|
taosThreadCondDestroy(&(vnodeGlobal.hasTask));
|
||||||
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
|
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
|
||||||
|
|
||||||
|
walCleanUp();
|
||||||
|
tqCleanUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeScheduleTask(int (*execute)(void*), void* arg) {
|
int vnodeScheduleTask(int (*execute)(void*), void* arg) {
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
#include "qworker.h"
|
|
||||||
#include "dataSinkMgt.h"
|
#include "dataSinkMgt.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "qwInt.h"
|
#include "qwInt.h"
|
||||||
#include "qwMsg.h"
|
#include "qwMsg.h"
|
||||||
|
#include "qworker.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
@ -406,7 +406,6 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||||
int32_t paramIdx = 0;
|
int32_t paramIdx = 0;
|
||||||
int32_t newParamIdx = 0;
|
int32_t newParamIdx = 0;
|
||||||
|
@ -430,7 +429,6 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||||
*pParam = &gQwMgmt.param[paramIdx];
|
*pParam = &gQwMgmt.param[paramIdx];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
char tbName[TSDB_TABLE_NAME_LEN];
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
|
@ -444,7 +442,6 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void qwCloseRef(void) {
|
void qwCloseRef(void) {
|
||||||
taosWLockLatch(&gQwMgmt.lock);
|
taosWLockLatch(&gQwMgmt.lock);
|
||||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||||
|
@ -454,13 +451,13 @@ void qwCloseRef(void) {
|
||||||
taosWUnLockLatch(&gQwMgmt.lock);
|
taosWUnLockLatch(&gQwMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
|
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
|
||||||
|
|
||||||
void qwDestroyImpl(void *pMgmt) {
|
void qwDestroyImpl(void *pMgmt) {
|
||||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||||
|
|
||||||
taosTmrStopA(&mgmt->hbTimer);
|
taosTmrStop(mgmt->hbTimer);
|
||||||
|
mgmt->hbTimer = NULL;
|
||||||
taosTmrCleanUp(mgmt->timer);
|
taosTmrCleanUp(mgmt->timer);
|
||||||
|
|
||||||
// TODO STOP ALL QUERY
|
// TODO STOP ALL QUERY
|
||||||
|
@ -527,10 +524,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
pStat = &mgmt->stat.msgStat.waitTime[0];
|
pStat = &mgmt->stat.msgStat.waitTime[0];
|
||||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
pStat = &mgmt->stat.msgStat.waitTime[1];
|
pStat = &mgmt->stat.msgStat.waitTime[1];
|
||||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||||
default:
|
default:
|
||||||
qError("unsupported queue type %d", type);
|
qError("unsupported queue type %d", type);
|
||||||
}
|
}
|
||||||
|
@ -538,5 +535,3 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -218,7 +218,8 @@ void taosCleanUpScheduler(void *param) {
|
||||||
taosThreadMutexDestroy(&pSched->queueMutex);
|
taosThreadMutexDestroy(&pSched->queueMutex);
|
||||||
|
|
||||||
if (pSched->pTimer) {
|
if (pSched->pTimer) {
|
||||||
taosTmrStopA(&pSched->pTimer);
|
taosTmrStop(pSched->pTimer);
|
||||||
|
pSched->pTimer = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSched->queue) taosMemoryFree(pSched->queue);
|
if (pSched->queue) taosMemoryFree(pSched->queue);
|
||||||
|
|
Loading…
Reference in New Issue