Merge pull request #2380 from taosdata/feature/query
[td-225] fix bugs in insert.
This commit is contained in:
commit
cb5ebb9c6d
|
@ -1901,7 +1901,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
|
||||
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
|
||||
pState->numOfTotal = pSql->numOfSubs;
|
||||
|
||||
pState->numOfRemain = pSql->numOfSubs;
|
||||
|
||||
pRes->code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1915,6 +1914,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
pSql->pSubs[0] = pSql; // the first sub insert points back to itself
|
||||
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0);
|
||||
|
||||
int32_t numOfSub = 1;
|
||||
int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0,
|
||||
|
@ -1922,15 +1922,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t i = 1;
|
||||
for (; i < pSql->numOfSubs; ++i) {
|
||||
for (; numOfSub < pSql->numOfSubs; ++numOfSub) {
|
||||
SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter));
|
||||
pSupporter1->pSql = pSql;
|
||||
pSupporter1->pState = pState;
|
||||
|
||||
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT, NULL);
|
||||
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL);
|
||||
if (pNew == NULL) {
|
||||
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
|
||||
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1939,26 +1938,25 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
* the callback function (multiVnodeInsertFinalize) correctly.
|
||||
*/
|
||||
pNew->fetchFp = pNew->fp;
|
||||
pSql->pSubs[i] = pNew;
|
||||
pSql->pSubs[numOfSub] = pNew;
|
||||
|
||||
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[i]);
|
||||
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, i,
|
||||
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, numOfSub,
|
||||
pDataBlocks->nSize, code);
|
||||
goto _error;
|
||||
} else {
|
||||
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
|
||||
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
|
||||
}
|
||||
}
|
||||
|
||||
if (i < pSql->numOfSubs) {
|
||||
if (numOfSub < pSql->numOfSubs) {
|
||||
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
|
||||
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return pRes->code; // free all allocated resource
|
||||
}
|
||||
|
||||
// use the local variable
|
||||
int32_t numOfSub = pSql->numOfSubs;
|
||||
for (int32_t j = 0; j < numOfSub; ++j) {
|
||||
SSqlObj *pSub = pSql->pSubs[j];
|
||||
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
|
||||
|
@ -1970,11 +1968,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|||
_error:
|
||||
// restore the udf fp
|
||||
pSql->fp = pSql->fetchFp;
|
||||
pSql->pSubs[0] = NULL;
|
||||
|
||||
tfree(pState);
|
||||
tfree(pSql->param);
|
||||
|
||||
for(int32_t j = 1; j < i; ++j) {
|
||||
for(int32_t j = 1; j < numOfSub; ++j) {
|
||||
tfree(pSql->pSubs[j]->param);
|
||||
taos_free_result(pSql->pSubs[j]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue