fix: hash put duplicated issue

This commit is contained in:
dapan1121 2024-11-14 16:37:10 +08:00
parent 34cfca745b
commit 0995facd64
8 changed files with 40 additions and 20 deletions

View File

@ -87,7 +87,7 @@ typedef struct SOutputData {
* @param pHandle output * @param pHandle output
* @return error code * @return error code
*/ */
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);

View File

@ -52,7 +52,7 @@ typedef struct SDataSinkHandle {
FGetSinkFlags fGetFlags; FGetSinkFlags fGetFlags;
} SDataSinkHandle; } SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam); void* pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,

View File

@ -356,6 +356,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); taosMemoryFreeClear(pDispatcher->nextOutput.pData);
nodesDestroyNode((SNode*)pDispatcher->pSchema);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
@ -436,7 +437,7 @@ int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
return numOfCols; return numOfCols;
} }
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
int32_t code; int32_t code;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
if (code) { if (code) {
@ -460,6 +461,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
pManager = NULL; pManager = NULL;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
pDataSink->pInputDataBlockDesc = NULL;
dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema); dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
dispatcher->status = DS_BUF_EMPTY; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false; dispatcher->queryEnd = false;

View File

@ -39,7 +39,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
SDataSinkManager* pManager = pSinkManager; SDataSinkManager* pManager = pSinkManager;
switch ((int)nodeType(pDataSink)) { switch ((int)nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:

View File

@ -534,6 +534,8 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwRetireJob(SQWJobInfo* pJob); void qwRetireJob(SQWJobInfo* pJob);
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);
void qwFreeTaskHandle(SQWTaskCtx *ctx);
void qwFreeSinkHandle(SQWTaskCtx *ctx);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -272,10 +272,10 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { void qwFreeTaskHandle(SQWTaskCtx *ctx) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) {
taosEnableFullMemPoolUsage(ctx->memPoolSession); taosEnableFullMemPoolUsage(ctx->memPoolSession);
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
taosDisableFullMemPoolUsage(); taosDisableFullMemPoolUsage();
@ -284,6 +284,18 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) {
} }
} }
void qwFreeSinkHandle(SQWTaskCtx *ctx) {
// Note: free/kill may in RC
void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle);
if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) {
QW_SINK_ENABLE_MEMPOOL(ctx);
dsDestroyDataSinker(osinkHandle);
QW_SINK_DISABLE_MEMPOOL();
qDebug("sink handle destroyed");
}
}
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
@ -308,16 +320,9 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// NO need to release dataConnInfo // NO need to release dataConnInfo
qwFreeTaskHandle(ctx, &ctx->taskHandle); qwFreeTaskHandle(ctx);
if (ctx->sinkHandle) { qwFreeSinkHandle(ctx);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsDestroyDataSinker(ctx->sinkHandle);
QW_SINK_DISABLE_MEMPOOL();
ctx->sinkHandle = NULL;
qDebug("sink handle destroyed");
}
taosArrayDestroy(ctx->tbInfo); taosArrayDestroy(ctx->tbInfo);

View File

@ -71,12 +71,13 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
int32_t code = TSDB_CODE_SUCCESS;
ctx->queryExecDone = true; ctx->queryExecDone = true;
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
if (ctx->explain && !ctx->explainRsped) { if (ctx->explain && !ctx->explainRsped) {
QW_ERR_RET(qwSendExplainResponse(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwSendExplainResponse(QW_FPARAMS(), ctx));
} }
if (!ctx->needFetch) { if (!ctx->needFetch) {
@ -86,7 +87,13 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
} }
} }
return TSDB_CODE_SUCCESS; _return:
if (!ctx->explain || ctx->explainRsped) {
qwFreeTaskHandle(ctx);
}
return code;
} }
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
@ -483,6 +490,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
qwFreeSinkHandle(ctx);
} }
} }
@ -868,6 +876,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
atomic_store_8((int8_t *)&ctx->queryContinue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0);
qwFreeSinkHandle(ctx);
} }
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
@ -957,6 +966,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
qwFreeSinkHandle(ctx);
} }
} }
@ -1582,6 +1592,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
qwFreeSinkHandle(ctx);
} }
break; break;

View File

@ -363,7 +363,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else { } else {
FREE_HASH_NODE(pHashObj->freeFp, pNewNode); taosMemoryFreeClear(pNewNode);
terrno = TSDB_CODE_DUP_KEY; terrno = TSDB_CODE_DUP_KEY;
code = terrno; code = terrno;
goto _exit; goto _exit;