[td-225] fix bugs found in script
This commit is contained in:
parent
2b7cca801b
commit
db7261235c
|
@ -318,7 +318,6 @@ typedef struct SSqlObj {
|
||||||
char freed : 4;
|
char freed : 4;
|
||||||
char listed : 4;
|
char listed : 4;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
pthread_mutex_t inUse; // make sure that one connection can only be utilized by one thread/process
|
|
||||||
SSqlCmd cmd;
|
SSqlCmd cmd;
|
||||||
SSqlRes res;
|
SSqlRes res;
|
||||||
uint16_t numOfSubs;
|
uint16_t numOfSubs;
|
||||||
|
|
|
@ -123,13 +123,6 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
|
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
|
|
||||||
pthread_mutexattr_t mutexattr;
|
|
||||||
memset(&mutexattr, 0, sizeof(pthread_mutexattr_t));
|
|
||||||
|
|
||||||
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
|
|
||||||
pthread_mutex_init(&pSql->inUse, &mutexattr);
|
|
||||||
pthread_mutexattr_destroy(&mutexattr);
|
|
||||||
|
|
||||||
pObj->pSql = pSql;
|
pObj->pSql = pSql;
|
||||||
pObj->pDnodeConn = pDnodeConn;
|
pObj->pDnodeConn = pDnodeConn;
|
||||||
|
|
||||||
|
@ -291,23 +284,11 @@ int taos_query(TAOS *taos, const char *sqlstr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj* pSql = pObj->pSql;
|
SSqlObj* pSql = pObj->pSql;
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
size_t sqlLen = strlen(sqlstr);
|
||||||
|
|
||||||
// now this TAOS_CONN object is in use by one thread
|
|
||||||
pthread_mutex_lock(&pSql->inUse);
|
|
||||||
|
|
||||||
size_t sqlLen = strlen(sqlstr);
|
|
||||||
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
||||||
|
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
tsem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
|
|
||||||
if (pCmd->command != TSDB_SQL_SELECT &&
|
|
||||||
pCmd->command != TSDB_SQL_SHOW &&
|
|
||||||
pCmd->command != TSDB_SQL_DESCRIBE_TABLE) {
|
|
||||||
pthread_mutex_unlock(&pSql->inUse);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pSql->res.code;
|
return pSql->res.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,7 +546,6 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
} else {
|
} else {
|
||||||
tscPartiallyFreeSqlObj(pSql);
|
tscPartiallyFreeSqlObj(pSql);
|
||||||
pthread_mutex_unlock(&pSql->inUse); // now this TAOS_CONN can be used by other threads
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -596,9 +576,8 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
if ((pCmd->command == TSDB_SQL_SELECT ||
|
if ((pCmd->command == TSDB_SQL_SELECT ||
|
||||||
pCmd->command == TSDB_SQL_SHOW ||
|
pCmd->command == TSDB_SQL_SHOW ||
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE ||
|
pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS &&
|
pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false &&
|
||||||
((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
|
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
|
||||||
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
|
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
|
|
||||||
tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows,
|
tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows,
|
||||||
|
|
|
@ -45,12 +45,13 @@ typedef struct SFillInfo {
|
||||||
int32_t numOfCols; // number of columns, including the tags columns
|
int32_t numOfCols; // number of columns, including the tags columns
|
||||||
int32_t rowSize; // size of each row
|
int32_t rowSize; // size of each row
|
||||||
char ** pTags; // tags value for current interpolation
|
char ** pTags; // tags value for current interpolation
|
||||||
|
int64_t slidingTime; // sliding value to determine the number of result for a given time window
|
||||||
int64_t slidingTime; // sliding value to determine the number of result for a given time window
|
|
||||||
char * prevValues; // previous row of data, to generate the interpolation results
|
char * prevValues; // previous row of data, to generate the interpolation results
|
||||||
char * nextValues; // next row of data
|
char * nextValues; // next row of data
|
||||||
|
char** pData; // original result data block involved in filling data
|
||||||
|
int32_t capacityInRows; // data buffer size in rows
|
||||||
|
|
||||||
SFillColInfo* pFillCol; // column info for fill operations
|
SFillColInfo* pFillCol; // column info for fill operations
|
||||||
char** pData; // original result data block involved in filling data
|
|
||||||
} SFillInfo;
|
} SFillInfo;
|
||||||
|
|
||||||
typedef struct SPoint {
|
typedef struct SPoint {
|
||||||
|
|
|
@ -79,7 +79,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
||||||
int32_t rowsize = 0;
|
int32_t rowsize = 0;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
|
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
|
||||||
pFillInfo->pData[i] = calloc(1, sizeof(tFilePage) + bytes * capacity);
|
pFillInfo->pData[i] = calloc(1, bytes * capacity);
|
||||||
|
|
||||||
rowsize += bytes;
|
rowsize += bytes;
|
||||||
}
|
}
|
||||||
|
@ -89,6 +89,8 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
pFillInfo->rowSize = rowsize;
|
pFillInfo->rowSize = rowsize;
|
||||||
|
pFillInfo->capacityInRows = capacity;
|
||||||
|
|
||||||
return pFillInfo;
|
return pFillInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,6 +121,17 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
||||||
pFillInfo->rowIdx = 0;
|
pFillInfo->rowIdx = 0;
|
||||||
pFillInfo->endKey = endKey;
|
pFillInfo->endKey = endKey;
|
||||||
pFillInfo->numOfRows = numOfRows;
|
pFillInfo->numOfRows = numOfRows;
|
||||||
|
|
||||||
|
// ensure the space
|
||||||
|
if (pFillInfo->capacityInRows < numOfRows) {
|
||||||
|
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
|
char* tmp = realloc(pFillInfo->pData[i], numOfRows*pFillInfo->pFillCol[i].col.bytes);
|
||||||
|
assert(tmp != NULL); // todo handle error
|
||||||
|
|
||||||
|
memset(tmp, 0, numOfRows*pFillInfo->pFillCol[i].col.bytes);
|
||||||
|
pFillInfo->pData[i] = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
|
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
|
||||||
|
@ -474,11 +487,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
|
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
|
||||||
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
|
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
|
||||||
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
|
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
|
||||||
|
|
||||||
|
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
|
||||||
|
assert(numOfRes == rows);
|
||||||
|
|
||||||
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
|
return numOfRes;
|
||||||
assert(numOfRes == rows);
|
|
||||||
|
|
||||||
return numOfRes;
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue