commit
f866f5d294
|
@ -33,16 +33,16 @@ extern "C" {
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define WAL_PROTO_VER 0
|
#define WAL_PROTO_VER 0
|
||||||
#define WAL_NOSUFFIX_LEN 20
|
#define WAL_NOSUFFIX_LEN 20
|
||||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||||
#define WAL_LOG_SUFFIX "log"
|
#define WAL_LOG_SUFFIX "log"
|
||||||
#define WAL_INDEX_SUFFIX "idx"
|
#define WAL_INDEX_SUFFIX "idx"
|
||||||
#define WAL_REFRESH_MS 1000
|
#define WAL_REFRESH_MS 1000
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
|
|
@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||||
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
|
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
|
||||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||||
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||||
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
// taosx
|
// taosx
|
||||||
taosArrayDestroy(pRsp->rsp.createTableLen);
|
taosArrayDestroy(pRsp->rsp.createTableLen);
|
||||||
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
|
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
|
||||||
|
@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
} else if (TD_RES_TMQ(res)) {
|
} else if (TD_RES_TMQ(res)) {
|
||||||
SMqRspObj *pRsp = (SMqRspObj *)res;
|
SMqRspObj *pRsp = (SMqRspObj *)res;
|
||||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||||
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||||
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
pRsp->resInfo.pRspMsg = NULL;
|
pRsp->resInfo.pRspMsg = NULL;
|
||||||
doFreeReqResultInfo(&pRsp->resInfo);
|
doFreeReqResultInfo(&pRsp->resInfo);
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
|
|
|
@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||||
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
|
// do nothing
|
||||||
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||||
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||||
|
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
|
||||||
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||||
|
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
|
||||||
|
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
|
||||||
|
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
|
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||||
|
taosMemoryFree(pRsp->metaRsp.metaRsp);
|
||||||
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||||
|
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
|
||||||
|
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
|
||||||
|
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
// taosx
|
||||||
|
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
||||||
|
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||||
SMqRspWrapper* msg = NULL;
|
SMqRspWrapper* rspWrapper = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
taosGetQitem(tmq->qall, (void**)&msg);
|
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||||
if (msg)
|
if (rspWrapper) {
|
||||||
taosFreeQitem(msg);
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
else
|
taosFreeQitem(rspWrapper);
|
||||||
|
} else {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = NULL;
|
rspWrapper = NULL;
|
||||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||||
while (1) {
|
while (1) {
|
||||||
taosGetQitem(tmq->qall, (void**)&msg);
|
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||||
if (msg)
|
if (rspWrapper) {
|
||||||
taosFreeQitem(msg);
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
else
|
taosFreeQitem(rspWrapper);
|
||||||
|
} else {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
tDeleteSMqAskEpRsp(rspMsg);
|
tDeleteSMqAskEpRsp(rspMsg);
|
||||||
*pReset = true;
|
*pReset = true;
|
||||||
} else {
|
} else {
|
||||||
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
*pReset = false;
|
*pReset = false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
||||||
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
|
@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
|
tmqFreeRspWrapper(rspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
while (1) {
|
while (1) {
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
if (tmqPollImpl(tmq, timeout) < 0) {
|
if (tmqPollImpl(tmq, timeout) < 0) {
|
||||||
tscDebug("return since poll err");
|
tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
|
||||||
/*return NULL;*/
|
/*return NULL;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,9 +32,9 @@ typedef struct SMetaStbStatsEntry {
|
||||||
} SMetaStbStatsEntry;
|
} SMetaStbStatsEntry;
|
||||||
|
|
||||||
typedef struct STagFilterResEntry {
|
typedef struct STagFilterResEntry {
|
||||||
uint64_t suid; // uid for super table
|
uint64_t suid; // uid for super table
|
||||||
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
||||||
uint32_t qTimes;// queried times for current super table
|
uint32_t qTimes; // queried times for current super table
|
||||||
} STagFilterResEntry;
|
} STagFilterResEntry;
|
||||||
|
|
||||||
struct SMetaCache {
|
struct SMetaCache {
|
||||||
|
@ -126,13 +126,14 @@ int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
goto _err2;
|
goto _err2;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5*1024*1024, -1, 0.5);
|
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
||||||
if (pCache->sTagFilterResCache.pUidResCache == NULL) {
|
if (pCache->sTagFilterResCache.pUidResCache == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err2;
|
goto _err2;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
pCache->sTagFilterResCache.pTableEntry =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||||
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
|
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err2;
|
goto _err2;
|
||||||
|
@ -419,7 +420,8 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
|
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||||
|
bool* acquireRes) {
|
||||||
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
||||||
|
|
||||||
// generate the composed key for LRU cache
|
// generate the composed key for LRU cache
|
||||||
|
@ -428,8 +430,8 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
pBuf[0] = suid;
|
pBuf[0] = suid;
|
||||||
memcpy(&pBuf[1], pKey, keyLen);
|
memcpy(&pBuf[1], pKey, keyLen);
|
||||||
|
|
||||||
int32_t len = keyLen + sizeof(uint64_t);
|
int32_t len = keyLen + sizeof(uint64_t);
|
||||||
LRUHandle *pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
*acquireRes = 0;
|
*acquireRes = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -439,7 +441,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
*acquireRes = 1;
|
*acquireRes = 1;
|
||||||
|
|
||||||
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
|
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
|
||||||
int32_t size = *(int32_t*) p;
|
int32_t size = *(int32_t*)p;
|
||||||
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
|
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
|
||||||
|
|
||||||
(*pEntry)->qTimes += 1;
|
(*pEntry)->qTimes += 1;
|
||||||
|
@ -467,12 +469,15 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
|
|
||||||
// remove the keys, of which query uid lists have been replaced already.
|
// remove the keys, of which query uid lists have been replaced already.
|
||||||
size_t s = taosArrayGetSize(pList);
|
size_t s = taosArrayGetSize(pList);
|
||||||
for(int32_t i = 0; i < s; ++i) {
|
for (int32_t i = 0; i < s; ++i) {
|
||||||
SListNode** p1 = taosArrayGet(pList, i);
|
SListNode** p1 = taosArrayGet(pList, i);
|
||||||
tdListPopNode(&(*pEntry)->list, *p1);
|
tdListPopNode(&(*pEntry)->list, *p1);
|
||||||
|
taosMemoryFree(*p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->qTimes = 0; // reset the query times
|
(*pEntry)->qTimes = 0; // reset the query times
|
||||||
|
|
||||||
|
taosArrayDestroy(pList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +492,8 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check both the payload size and selectivity ratio
|
// check both the payload size and selectivity ratio
|
||||||
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) {
|
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||||
|
int32_t payloadLen, double selectivityRatio) {
|
||||||
if (selectivityRatio > tsSelectivityRatio) {
|
if (selectivityRatio > tsSelectivityRatio) {
|
||||||
metaDebug("vgId:%d, suid:%" PRIu64
|
metaDebug("vgId:%d, suid:%" PRIu64
|
||||||
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
|
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
|
||||||
|
@ -525,9 +531,10 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
ASSERT(sizeof(uint64_t) + keyLen == 24);
|
ASSERT(sizeof(uint64_t) + keyLen == 24);
|
||||||
|
|
||||||
// add to cache.
|
// add to cache.
|
||||||
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW);
|
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||||
metaDebug("vgId:%d, suid:%"PRIu64" list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode),
|
TAOS_LRU_PRIORITY_LOW);
|
||||||
suid, (int32_t) taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
||||||
|
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -539,7 +546,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t keyLen = sizeof(uint64_t) * 3;
|
int32_t keyLen = sizeof(uint64_t) * 3;
|
||||||
uint64_t p[3] = {0};
|
uint64_t p[3] = {0};
|
||||||
p[0] = suid;
|
p[0] = suid;
|
||||||
|
|
||||||
|
|
|
@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (pHandle) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
if (pHandle->pRef) {
|
||||||
|
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||||
|
}
|
||||||
|
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
|
if (code != 0) {
|
||||||
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||||
|
@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
||||||
ASSERT(0);
|
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = j - num;
|
int32_t rowIndex = j - num;
|
||||||
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
|
||||||
|
pOperator->exprSupp.numOfExprs);
|
||||||
|
|
||||||
// assign the group keys or user input constant values if required
|
// assign the group keys or user input constant values if required
|
||||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
|
@ -331,7 +332,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = pBlock->info.rows - num;
|
int32_t rowIndex = pBlock->info.rows - num;
|
||||||
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
|
||||||
|
pOperator->exprSupp.numOfExprs);
|
||||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -469,8 +471,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
|
||||||
createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL);
|
optrDefaultBufFn, NULL);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -776,6 +778,12 @@ static void destroyPartitionOperatorInfo(void* param) {
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pGroupColVals);
|
taosArrayDestroy(pInfo->pGroupColVals);
|
||||||
taosMemoryFree(pInfo->keyBuf);
|
taosMemoryFree(pInfo->keyBuf);
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
|
||||||
|
taosArrayDestroy(pGp->pPageList);
|
||||||
|
}
|
||||||
taosArrayDestroy(pInfo->sortedGroupArray);
|
taosArrayDestroy(pInfo->sortedGroupArray);
|
||||||
|
|
||||||
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||||
|
@ -850,7 +858,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -1141,8 +1150,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
|
||||||
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -31,7 +31,7 @@ extern "C" {
|
||||||
|
|
||||||
#define QW_DEFAULT_SCHEDULER_NUMBER 100
|
#define QW_DEFAULT_SCHEDULER_NUMBER 100
|
||||||
#define QW_DEFAULT_TASK_NUMBER 10000
|
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
#define QW_DEFAULT_SCH_TASK_NUMBER 3000
|
||||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||||
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
||||||
#define QW_SCH_TIMEOUT_MSEC 180000
|
#define QW_SCH_TIMEOUT_MSEC 180000
|
||||||
|
@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt {
|
||||||
|
|
||||||
#define QW_ERR_RET(c) \
|
#define QW_ERR_RET(c) \
|
||||||
do { \
|
do { \
|
||||||
int32_t _code = (c); \
|
int32_t _code = (c); \
|
||||||
if (_code != TSDB_CODE_SUCCESS) { \
|
if (_code != TSDB_CODE_SUCCESS) { \
|
||||||
terrno = _code; \
|
terrno = _code; \
|
||||||
return _code; \
|
return _code; \
|
||||||
|
@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt {
|
||||||
} while (0)
|
} while (0)
|
||||||
#define QW_RET(c) \
|
#define QW_RET(c) \
|
||||||
do { \
|
do { \
|
||||||
int32_t _code = (c); \
|
int32_t _code = (c); \
|
||||||
if (_code != TSDB_CODE_SUCCESS) { \
|
if (_code != TSDB_CODE_SUCCESS) { \
|
||||||
terrno = _code; \
|
terrno = _code; \
|
||||||
} \
|
} \
|
||||||
|
@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt {
|
||||||
} while (0)
|
} while (0)
|
||||||
#define QW_ERR_JRET(c) \
|
#define QW_ERR_JRET(c) \
|
||||||
do { \
|
do { \
|
||||||
code = (c); \
|
code = (c); \
|
||||||
if (code != TSDB_CODE_SUCCESS) { \
|
if (code != TSDB_CODE_SUCCESS) { \
|
||||||
terrno = code; \
|
terrno = code; \
|
||||||
goto _return; \
|
goto _return; \
|
||||||
|
|
|
@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
||||||
|
|
||||||
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
|
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// Note: free/kill may in RC
|
// Note: free/kill may in RC
|
||||||
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||||
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
||||||
|
@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
int8_t nodeType = mgmt->nodeType;
|
int8_t nodeType = mgmt->nodeType;
|
||||||
int32_t nodeId = mgmt->nodeId;
|
int32_t nodeId = mgmt->nodeId;
|
||||||
|
|
||||||
|
int32_t taskCount = 0;
|
||||||
|
int32_t schStatusCount = 0;
|
||||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||||
|
|
||||||
taosTmrStop(mgmt->hbTimer);
|
taosTmrStop(mgmt->hbTimer);
|
||||||
|
@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
uint64_t qId, tId;
|
uint64_t qId, tId;
|
||||||
int32_t eId;
|
int32_t eId;
|
||||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||||
|
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||||
void *key = taosHashGetKey(pIter, NULL);
|
void *key = taosHashGetKey(pIter, NULL);
|
||||||
|
@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
qwFreeTaskCtx(ctx);
|
qwFreeTaskCtx(ctx);
|
||||||
QW_TASK_DLOG_E("task ctx freed");
|
QW_TASK_DLOG_E("task ctx freed");
|
||||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||||
|
taskCount++;
|
||||||
}
|
}
|
||||||
taosHashCleanup(mgmt->ctxHash);
|
taosHashCleanup(mgmt->ctxHash);
|
||||||
|
|
||||||
|
@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||||
qwDestroySchStatus(sch);
|
qwDestroySchStatus(sch);
|
||||||
|
|
||||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||||
|
schStatusCount++;
|
||||||
}
|
}
|
||||||
taosHashCleanup(mgmt->schHash);
|
taosHashCleanup(mgmt->schHash);
|
||||||
|
|
||||||
|
@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
|
|
||||||
qwCloseRef();
|
qwCloseRef();
|
||||||
|
|
||||||
qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt,
|
||||||
|
taskCount, schStatusCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwOpenRef(void) {
|
int32_t qwOpenRef(void) {
|
||||||
|
|
|
@ -8,9 +8,9 @@
|
||||||
#include "qwMsg.h"
|
#include "qwMsg.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "tglobal.h"
|
|
||||||
|
|
||||||
SQWorkerMgmt gQwMgmt = {
|
SQWorkerMgmt gQwMgmt = {
|
||||||
.lock = 0,
|
.lock = 0,
|
||||||
|
@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
if (queryStop) {
|
if (queryStop) {
|
||||||
*queryStop = true;
|
*queryStop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
QW_ERR_RET(code);
|
QW_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks,
|
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
|
||||||
pOutput->numOfRows);
|
pOutput->numOfRows);
|
||||||
|
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
||||||
|
@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == ctx->level) {
|
if (0 == ctx->level) {
|
||||||
QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
|
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows,
|
||||||
|
ctx->level);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
||||||
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
|
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
|
||||||
|
pOutput->numOfRows);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -538,7 +540,7 @@ _return:
|
||||||
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
|
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
|
||||||
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
|
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
|
||||||
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
|
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
|
||||||
if (!rsped) {
|
if (!rsped) {
|
||||||
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
|
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -650,8 +652,8 @@ _return:
|
||||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||||
|
|
||||||
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -671,8 +673,8 @@ _return:
|
||||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
tstrerror(code), dataLen);
|
dataLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
do {
|
do {
|
||||||
ctx = NULL;
|
ctx = NULL;
|
||||||
|
|
||||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
|
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
|
||||||
|
|
||||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
|
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code ||
|
||||||
|
0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
|
||||||
// Note: query is not running anymore
|
// Note: query is not running anymore
|
||||||
QW_SET_PHASE(ctx, 0);
|
QW_SET_PHASE(ctx, 0);
|
||||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||||
|
|
||||||
uint64_t qId, tId;
|
uint64_t qId, tId;
|
||||||
int32_t eId;
|
int32_t eId;
|
||||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||||
|
@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
||||||
QW_TASK_DLOG_E("start to force stop task");
|
QW_TASK_DLOG_E("start to force stop task");
|
||||||
|
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
QW_TASK_WLOG_E("task already dropping");
|
QW_TASK_WLOG_E("task already dropping");
|
||||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_QUERY_RUNNING(ctx)) {
|
if (QW_QUERY_RUNNING(ctx)) {
|
||||||
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
||||||
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
|
|
|
@ -223,6 +223,7 @@ void walClose(SWal *pWal) {
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pWal->pRefHash);
|
taosHashCleanup(pWal->pRefHash);
|
||||||
|
pWal->pRefHash = NULL;
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
|
|
|
@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) {
|
||||||
return pRef;
|
return pRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
|
||||||
void walCloseRef(SWal *pWal, int64_t refId) {
|
void walCloseRef(SWal *pWal, int64_t refId) {
|
||||||
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
if (ppRef == NULL) return;
|
if (ppRef == NULL) return;
|
||||||
SWalRef *pRef = *ppRef;
|
SWalRef *pRef = *ppRef;
|
||||||
|
if (pRef) {
|
||||||
|
wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
|
||||||
|
} else {
|
||||||
|
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
|
||||||
|
}
|
||||||
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
SWal *pWal = pRef->pWal;
|
SWal *pWal = pRef->pWal;
|
||||||
|
|
Loading…
Reference in New Issue