Merge branch 'fix/liaohj' into main
This commit is contained in:
commit
fca79dbc53
|
@ -149,7 +149,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
|||
* @param handle
|
||||
* @return
|
||||
*/
|
||||
|
||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
|
||||
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
|
||||
|
@ -162,6 +161,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
|
|||
* @return
|
||||
*/
|
||||
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
|
||||
bool qTaskIsExecuting(qTaskInfo_t qinfo);
|
||||
|
||||
|
@ -171,14 +171,6 @@ bool qTaskIsExecuting(qTaskInfo_t qinfo);
|
|||
*/
|
||||
void qDestroyTask(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
|
||||
*
|
||||
* @param iter the table iterator to traverse all tables belongs to a super table, or an invert index
|
||||
* @return
|
||||
*/
|
||||
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
|
||||
|
||||
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
|
||||
|
|
|
@ -69,14 +69,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles);
|
|||
*/
|
||||
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pArray
|
||||
* @param comparFn
|
||||
* @param fp
|
||||
*/
|
||||
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
|
||||
|
||||
/**
|
||||
* add all element from the source array list into the destination
|
||||
* @param pArray
|
||||
|
@ -252,14 +244,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
|||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
||||
|
||||
/**
|
||||
* swap array
|
||||
* @param a
|
||||
* @param b
|
||||
* @return
|
||||
*/
|
||||
void taosArraySwap(SArray* a, SArray* b);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -601,6 +601,13 @@ static void* topicNameDup(void* p){
|
|||
return taosStrdup((char*) p);
|
||||
}
|
||||
|
||||
static void freeItem(void* param) {
|
||||
void* pItem = *(void**)param;
|
||||
if (pItem != NULL) {
|
||||
taosMemoryFree(pItem);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
|
@ -616,7 +623,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
int32_t code = -1;
|
||||
SArray *pTopicList = subscribe.topicNames;
|
||||
taosArraySort(pTopicList, taosArrayCompareString);
|
||||
taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
|
||||
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pTopicList);
|
||||
|
||||
|
|
|
@ -895,6 +895,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
|
||||
// kill executing task
|
||||
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
||||
if (pTaskInfo != NULL) {
|
||||
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
taosWLockLatch(&pTq->lock);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
|
||||
|
@ -905,7 +911,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||
qStreamCloseTsdbReader(pTaskInfo);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
|
|
|
@ -65,16 +65,17 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
|
||||
qTaskInfo_t task = pExec->task;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||
tqDebug("prepare scan failed, return");
|
||||
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
} else {
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||
tqDebug("prepare scan failed, return");
|
||||
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
}
|
||||
|
@ -86,13 +87,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
||||
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(),
|
||||
pHandle->consumerId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
|
||||
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock);
|
||||
|
||||
// current scan should be stopped asap, since the rebalance occurs.
|
||||
if (pDataBlock == NULL) {
|
||||
|
@ -115,15 +117,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
}
|
||||
|
||||
if (pRsp->rspOffset.type == 0) {
|
||||
tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
|
||||
pRsp->rspOffset.uid, pRsp->rspOffset.version);
|
||||
tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type,
|
||||
pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pRsp->withTbName || pRsp->withSchema) {
|
||||
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
||||
tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -749,6 +749,23 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo, rspCode);
|
||||
|
||||
while(qTaskIsExecuting(pTaskInfo)) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
pTaskInfo->code = rspCode;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
if (NULL == pTaskInfo) {
|
||||
|
|
|
@ -633,7 +633,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
|
|||
}
|
||||
}
|
||||
|
||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; }
|
||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code);}
|
||||
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
|
|||
|
||||
taosArraySet(pArray, pos + 1, p2);
|
||||
memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
|
||||
pos += 1;
|
||||
pos += 1;
|
||||
} else {
|
||||
pos += 1;
|
||||
}
|
||||
|
@ -157,45 +157,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
|
|||
pArray->size = pos + 1;
|
||||
}
|
||||
|
||||
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
|
||||
size_t size = pArray->size;
|
||||
if (size <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t pos = 0;
|
||||
for (int32_t i = 1; i < size; ++i) {
|
||||
char* p1 = taosArrayGet(pArray, pos);
|
||||
char* p2 = taosArrayGet(pArray, i);
|
||||
|
||||
if (comparFn(p1, p2) == 0) {
|
||||
// do nothing
|
||||
} else {
|
||||
if (pos + 1 != i) {
|
||||
void* p = taosArrayGetP(pArray, pos + 1);
|
||||
if (fp != NULL) {
|
||||
fp(p);
|
||||
}
|
||||
|
||||
taosArraySet(pArray, pos + 1, p2);
|
||||
memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
|
||||
pos += 1;
|
||||
} else {
|
||||
pos += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (fp != NULL) {
|
||||
for (int32_t i = pos + 1; i < pArray->size; ++i) {
|
||||
void* p = taosArrayGetP(pArray, i);
|
||||
fp(p);
|
||||
}
|
||||
}
|
||||
|
||||
pArray->size = pos + 1;
|
||||
}
|
||||
|
||||
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
|
||||
if (pInput) {
|
||||
return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput));
|
||||
|
@ -392,20 +353,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
|
|||
pArray->size = 0;
|
||||
}
|
||||
|
||||
void taosArrayClearP(SArray* pArray, FDelete fp) {
|
||||
if (pArray == NULL) return;
|
||||
if (fp == NULL) {
|
||||
pArray->size = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pArray->size; ++i) {
|
||||
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
||||
}
|
||||
|
||||
pArray->size = 0;
|
||||
}
|
||||
|
||||
void* taosArrayDestroy(SArray* pArray) {
|
||||
if (pArray) {
|
||||
taosMemoryFree(pArray->pData);
|
||||
|
@ -495,6 +442,7 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
|
|||
if (pArray->size <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 1; i <= pArray->size - 1; ++i) {
|
||||
for (int32_t j = i; j > 0; --j) {
|
||||
if (fn(taosArrayGetP(pArray, j), taosArrayGetP(pArray, j - 1), param) == -1) {
|
||||
|
@ -507,7 +455,6 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
|
|||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) {
|
||||
|
@ -539,21 +486,3 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
|||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||
}
|
||||
|
||||
void taosArraySwap(SArray* a, SArray* b) {
|
||||
if (a == NULL || b == NULL) return;
|
||||
size_t t = a->size;
|
||||
a->size = b->size;
|
||||
b->size = t;
|
||||
|
||||
uint32_t cap = a->capacity;
|
||||
a->capacity = b->capacity;
|
||||
b->capacity = cap;
|
||||
|
||||
uint32_t elem = a->elemSize;
|
||||
a->elemSize = b->elemSize;
|
||||
b->elemSize = elem;
|
||||
|
||||
void* data = a->pData;
|
||||
a->pData = b->pData;
|
||||
b->pData = data;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue