Merge pull request #7416 from taosdata/hotfix/TD-6020

[TD-6020]fix taos crash issue
This commit is contained in:
dapan1121 2021-08-19 09:27:40 +08:00 committed by GitHub
commit 4e8f3b5de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 31 deletions

View File

@ -50,6 +50,12 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql); int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(SSqlObj *pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -2034,17 +2034,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
for(int32_t i = 0; i < numOfSubs; ++i) { for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL); assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param; tscFreeRetrieveSup(pSub);
tfree(pSupport->localBuffer);
tfree(pSupport);
taos_free_result(pSub); taos_free_result(pSub);
} }
@ -2562,7 +2559,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tscFreeRetrieveSup(SSqlObj *pSql) { void tscFreeRetrieveSup(SSqlObj *pSql) {
SRetrieveSupport *trsupport = pSql->param; SRetrieveSupport *trsupport = pSql->param;
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
@ -2720,33 +2717,43 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
int32_t code = pParentSql->res.code; int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { SSqlObj *userSql = NULL;
// remove the cached tableMeta and vgroup id list, and then parse the sql again if (pParentSql->param) {
SSqlCmd* pParentCmd = &pParentSql->cmd; userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); }
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); if (userSql == NULL) {
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); userSql = pParentSql;
}
pParentSql->res.code = TSDB_CODE_SUCCESS; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
pParentSql->retry++; if (userSql != pParentSql) {
tscFreeRetrieveSup(pParentSql);
}
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tscFreeSubobj(userSql);
tstrerror(code), pParentSql->retry); tfree(userSql->pSubs);
code = tsParseSql(pParentSql, true); userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", userSql->self,
tstrerror(code), userSql->retry);
tscResetSqlCmd(&userSql->cmd, true);
code = tsParseSql(userSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code; userSql->res.code = code;
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(userSql);
return; return;
} }
executeQuery(pParentSql, pQueryInfo); pQueryInfo = tscGetQueryInfo(&userSql->cmd);
executeQuery(userSql, pQueryInfo);
} else { } else {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} }

View File

@ -3767,8 +3767,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
int32_t index = ps->subqueryIndex; int32_t index = ps->subqueryIndex;
bool ret = subAndCheckDone(pSql, pParentSql, index); bool ret = subAndCheckDone(pSql, pParentSql, index);
tfree(ps); tscFreeRetrieveSup(pSql);
pSql->param = NULL;
if (!ret) { if (!ret) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index); tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
@ -3778,12 +3777,13 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor // todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd; if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); tscAsyncResultOnError(pParentSql);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); return;
}
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); tscFreeSubobj(pParentSql);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++; pParentSql->retry++;
@ -3791,6 +3791,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry); tstrerror(code), pParentSql->retry);
tscResetSqlCmd(&pParentSql->cmd, true);
code = tsParseSql(pParentSql, true); code = tsParseSql(pParentSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
@ -3802,7 +3805,8 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
executeQuery(pParentSql, pQueryInfo); executeQuery(pParentSql, pQueryInfo);
return; return;
} }
@ -3825,9 +3829,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
assert(pSql->subState.numOfSub == 0);
pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream); pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream);
assert(pSql->pSubs == NULL);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
assert(pSql->subState.states == NULL);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
code = pthread_mutex_init(&pSql->subState.mutex, NULL); code = pthread_mutex_init(&pSql->subState.mutex, NULL);