change var name
This commit is contained in:
parent
4aa9c7adc9
commit
6fabe650af
|
@ -52,7 +52,7 @@ typedef struct {
|
||||||
SShellObj *pObj;
|
SShellObj *pObj;
|
||||||
int64_t offset; // offset relative the blks
|
int64_t offset; // offset relative the blks
|
||||||
char blks[];
|
char blks[];
|
||||||
} SBatchImportInfo;
|
} SBatchSubmitInfo;
|
||||||
|
|
||||||
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
||||||
int sid, vnode;
|
int sid, vnode;
|
||||||
|
@ -613,21 +613,21 @@ _submit_over:
|
||||||
if (pSubmit->import) { // Import case
|
if (pSubmit->import) { // Import case
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
||||||
SBatchImportInfo *pImportInfo =
|
SBatchSubmitInfo *pSubmitInfo =
|
||||||
(SBatchImportInfo *)calloc(1, sizeof(SBatchImportInfo) + msgLen - sizeof(SShellSubmitMsg));
|
(SBatchSubmitInfo *)calloc(1, sizeof(SBatchSubmitInfo) + msgLen - sizeof(SShellSubmitMsg));
|
||||||
if (pImportInfo == NULL) {
|
if (pSubmitInfo == NULL) {
|
||||||
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
|
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
|
||||||
} else { // Start a timer to process the next part of request
|
} else { // Start a timer to process the next part of request
|
||||||
pImportInfo->import = 1;
|
pSubmitInfo->import = 1;
|
||||||
pImportInfo->vnode = pSubmit->vnode;
|
pSubmitInfo->vnode = pSubmit->vnode;
|
||||||
pImportInfo->numOfSid = pSubmit->numOfSid;
|
pSubmitInfo->numOfSid = pSubmit->numOfSid;
|
||||||
pImportInfo->ssid = i; // start from this position, not the initial position
|
pSubmitInfo->ssid = i; // start from this position, not the initial position
|
||||||
pImportInfo->pObj = pObj;
|
pSubmitInfo->pObj = pObj;
|
||||||
pImportInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
|
pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
|
||||||
assert(pImportInfo->offset >= 0);
|
assert(pSubmitInfo->offset >= 0);
|
||||||
memcpy((void *)(pImportInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
|
memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
|
||||||
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl);
|
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
|
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
|
||||||
|
@ -642,18 +642,18 @@ _submit_over:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
|
static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
|
||||||
SBatchImportInfo *pImportInfo = (SBatchImportInfo *)param;
|
SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param;
|
||||||
assert(pImportInfo != NULL && pImportInfo->import);
|
assert(pSubmitInfo != NULL && pSubmitInfo->import);
|
||||||
|
|
||||||
int32_t i = 0, numOfPoints = 0;
|
int32_t i = 0, numOfPoints = 0;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SShellObj * pShell = pImportInfo->pObj;
|
SShellObj * pShell = pSubmitInfo->pObj;
|
||||||
SVnodeObj * pVnode = &vnodeList[pImportInfo->vnode];
|
SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode];
|
||||||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pImportInfo->blks + pImportInfo->offset);
|
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset);
|
||||||
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
|
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
|
||||||
|
|
||||||
for (i = pImportInfo->ssid; i < pImportInfo->numOfSid; i++) {
|
for (i = pSubmitInfo->ssid; i < pSubmitInfo->numOfSid; i++) {
|
||||||
numOfPoints = 0;
|
numOfPoints = 0;
|
||||||
|
|
||||||
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
|
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
|
||||||
|
@ -676,9 +676,9 @@ static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
pImportInfo->ssid = i;
|
pSubmitInfo->ssid = i;
|
||||||
pImportInfo->offset = ((char *)pBlocks) - pImportInfo->blks;
|
pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks;
|
||||||
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl);
|
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
|
||||||
} else {
|
} else {
|
||||||
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
|
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
|
||||||
tfree(param);
|
tfree(param);
|
||||||
|
|
Loading…
Reference in New Issue