INTERNAL: make piped insert operations process synchronously
๐ Related Issue
- https://github.com/jam2in/arcus-works/issues/540
โจ๏ธ What I did
-
lop/sop/mop/bop piped insert์ ํํด ์ ์ฉํ๋ PR์ ๋๋ค.
๋์ ๊ณผ์
-
500๊ฐ ์์ดํ ๋จ์๋ก Operation ๊ฐ์ฒด๋ฅผ ๋๋์ด ๋น๋๊ธฐ๋ก ์ผ์ ํ ์์ฒญ์ ๋ณด๋ด๋ ๊ฒ์ ๋๊ธฐ ๋ฐฉ์์ผ๋ก ํ๋์ฉ ๋ณด๋ด์ด Arcus ์๋ฒ์ ๊ณผ๋ํ ๋ถํ๊ฐ ๋ค์ด๊ฐ๋ ๊ฒ์ ๋ฐฉ์งํ๊ณ , ์คํจ ์ ๋ค์ Operation์ ์ํํ์ง ์๋๋ก ํฉ๋๋ค.
-
์ด์ operation์ด ์ฑ๊ณตํด์ผ๋ง ๋ค์ operation์ด writeQueue์ Future์ operation list์ ์ถ๊ฐ๋ฉ๋๋ค.
-
๋ง์ฝ CLIENT_ERROR, SERVER_ERROR ์คํจ๊ฐ ๋ฐ์ํ๋ฉด ๊ทธ ์ฆ์ latch.countdown์ ํธ์ถํด ๋จ์ operation์ ์ํํ์ง ์์ต๋๋ค.
-
OVERFLOWED, OUT_OF_RANGE ๊ฐ์ ์คํจ๊ฐ ๋ฐ์ํ๋ฉด ์ดํ operation์ ์ฒซ command๊ฐ NOT_EXECUTED ์ํ์์ future์ failedResult์ ์ถ๊ฐํฉ๋๋ค.
-
NOT_EXECUTED ์ดํ์ ๋ชจ๋ ์ฐ์ฐ์ ์คํ๋์ง ์์ ๊ฒ์ ๋๋ค. (CANCELED๊ฐ ์๋ NOT_EXECUTED๋ฅผ ์ฌ์ฉํ๋ ์ด์ ๋ ์ง์ง cancel ์ํฉ๊ณผ์ ํผ์ฉ์ ๋ง๊ณ ์ ํ๊ธฐ ์ํจ์ ๋๋ค.)
-
๋ณธ PR ์ด์ ์ pipe ๊ด๋ จ ํด๋์ค ๊ตฌ์กฐ ๋ณ๊ฒฝ์ด ์์๊ณ , ์ด์ ๋ง์ถ์ด SingleKeyPipedOperationImpl ํด๋์ค๋ฅผ ์ถ๊ฐํฉ๋๋ค.
- PipedOperationImpl ์์ฒด๋ฅผ ์์ ํ ๊ฒฝ์ฐ bulk insert๊น์ง ์ํฅ์ด ๊ฐ๊ธฐ ๋๋ฌธ์, single key ์ ์ฉ operation ํด๋์ค๋ฅผ ์ถ๊ฐํ ํ handleLine ๋ฉ์๋๋ฅผ ์์ ํ์ต๋๋ค.
- ์ด ํด๋์ค๋ฅผ CollectionPipedInsertOperationImpl ๊ฐ ์์๋ฐ๋๋ก ํฉ๋๋ค. (์ถํ CollectionPipedUpdateOperationImpl ๋ฑ ํด๋์ค๋ค๋ ์์๋ฐ์ ์์ ์ ๋๋ค.)
- PipedOperationImpl ํด๋์ค๋ ํ์ดํ๋ฅผ ์ฌ์ฉํ๋ ๋ชจ๋ Operation์ ๋ํ ๋ก์ง์ ๊ด๋ฆฌํ๊ณ , single key๋ multi key์ ๋ํ ์ฒ๋ฆฌ๋ฅผ ์ด ํด๋์ค์ ๊ฐ์ ๋ณ๋ ํด๋์ค์ ๋์ด ๋ถ๋ฆฌ๋ ํํ๋ก ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
@jhpark816 ๋ฆฌ๋ทฐ ๋ฐ์ํ์ต๋๋ค.
@uhm0311 ๋ฆฌ๋ทฐ ๋ฐ๋๋๋ค.
@oliviarla
์ปจํ๋ฆญํธ ํ์ธํด์ฃผ์ธ์.
@uhm0311 @jhpark816 ์ฒซ ์ฝ๋ฉํธ์ ์๋กญ๊ฒ ๊ตฌํ๋ ์ฌํญ์ ๋ค์ ์ ๋ฆฌํด๋์์ต๋๋ค. ๋ฆฌ๋ทฐ ๋ถํ๋๋ฆฝ๋๋ค.
cancel ๊ด๋ จ ์ค๋ช
- ๊ธฐ์กด์๋ future๋ฅผ ํตํ cancel ์์ฒญ์ด "operation์ complete ์ฒ๋ฆฌ์ค์ด๊ณ nextOp๊ฐ ๋จ์์์๋ค๋ฉด" nextOp๋ฅผ cancelํ๋ ๋์ future๋ฅผ ํตํ cancel์ ์คํจํ๋ค๊ณ ์ฒ๋ฆฌํ์์ต๋๋ค.
- cancel ์์ฒญ์ด ๋ค์ด์ค๋ฉด ํ์ฌ ์งํ์ค์ธ op๋ฅผ cancel์ํค๊ฑฐ๋ ํ์ฌ ์งํ์ค์ธ op๋ฅผ complete ์ฒ๋ฆฌ์ค์ด๋ผ๋ฉด nextOp๋ฅผ cancel ์ํค๋๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ด ์ ์ ํฉ๋๋ค.
- ๋ฐ๋ผ์ ์์ ๊ฐ์ ๋ฐฉ์์ผ๋ก cancel ์์ฒญ์ ์ฒ๋ฆฌํ๊ณ ์ ํฉ๋๋ค.
- ์ฐธ๊ณ ๋ก nextOp์ cancel ์ฒ๋ฆฌ๋ฅผ future์์ ์งํํด์ผ cancel ์๋ฃ ์ฌ๋ถ๋ฅผ ์ฝ๊ฒ ๋ฐ๋ก ํ์ธํ ์ ์์ผ๋ฏ๋ก, future์ ์๋ฃ๋์๊ฑฐ๋ ์ํ์ค์ธ Op ๋ฟ๋ง ์๋๋ผ ์ํ ๋๊ธฐ์ค์ธ ๋ชจ๋ op๊น์ง ๋ด๋๋ก ๋ณ๊ฒฝํ๊ฒ ์ต๋๋ค.
์ฒ๋ฆฌ ํ๋ฆ
- ๋ค์์
future.cancel()์ด ํธ์ถ๋๋ ์์ ์ ๋ฐ๋ฅธ ์ผ์ด์ค๋ฅผ ๋๋ต ์ ๋ฆฌํด๋ณด์์ต๋๋ค.future.cancel()๋ก์ง๊ณผ ๋ฉ์๋ ๋ก์ง์ ๋์์ ์คํ๋ ์ ์์ด ํธ์ถ ์์ ๊ณผ ๋ก์ง ์ํ ์์ ์ด ๋ค๋ฅผ ์ ์์ง๋ง ์ผ๋จ์ ์ผ์ด์ค๋ฅผ ๋๋ ์๊ฐํด๋ณด๊ฒ ์ต๋๋ค.-
ํธ์ถ ์์ ์ ์ฝ๋์
/* <case N> */๋ก ์ ์ด๋์์ต๋๋ค. -
future.cancel()์ ๋๋ต ์๋์ ๊ฐ์ด ๋์ํ ๊ฒ์ ๋๋ค.public boolean cancel(boolean ign) { this.startCancel(); // canceling flag๋ฅผ true๋ก ์ค์ boolean succeed; if (ops.get(currentOpIdx).cancel(ign)) { succeed = true; } else { this.lock(); if (currentOpIdx < ops.size() - 1) { succeed = ops.get(++currentOpIdx).cancel(ign); } else { succeed = false; } this.unlock(); } this.endCancel(); // canceling flag๋ฅผ false๋ก ์ค์ return succeed; }
-
future.cancel() ํธ์ถ์ด transitionState(OperationState.COMPLETE) ํธ์ถ๊ณผ ๊ฒน์น ๋
protected final void transitionState(OperationState newState) {
state = newState;
if (state != OperationState.WRITE_QUEUED &&
state != OperationState.WRITING) {
cmd = null;
}
/* <case 1> */
if (state == OperationState.COMPLETE &&
callbacked.compareAndSet(false, true)) {
/* <case 2> */
callback.complete();
/* <case 3> */
}
}
- <case 1>
- state๋ COMPLETE์ด์ง๋ง cancel() ๋ฉ์๋ ์ํ ์ callbacked๋ฅผ set ํ๊ฒ ๋์ด, cancel ์์ฒญ์ ์ฑ๊ณตํ๋ค.
- <case 2>
- ํด๋น op๋ ์๋ฃ๋ ์ํ์ด๋ฏ๋ก cancel() ๋ฉ์๋ ์ํ์ ์คํจํ๋ค.
-
future.cancel()์์ ๋จ์ op๊ฐ ์๋ ๊ฒฝ์ฐ cancel ์์ผcallback.complete()์์ ๋ ์ด์ op๊ฐ input queue์ ๋ฑ๋ก๋์ง ์๋๋ก ํ๋ค. -
future.cancel()์์ ๋จ์ op๊ฐ ์๋ ๊ฒฝ์ฐ cancel ์์ฒญ์ ์คํจ ์ฒ๋ฆฌ ํ๋ค.
- <case 3>
- ์ด๋ฏธ callback์ ํตํด nextOp๊ฐ ๋ฑ๋ก๋ ์์ ์ด๋ฏ๋ก, ์ํ์ค์ธ operation์ ๋ํ cancel ์์ฒญ์ด ์ฑ๊ณตํ ๊ฒ์ด๋ค.
future.cancel() ํธ์ถ์ด callback.complete()์ ๊ฒน์น ๋
public void complete() {
if (rv.getOperationStatus().isSuccess()) {
Operation op = rv.getNextOp(); // next op๊ฐ ์๋ค๋ฉด op ๊ฐ์ฒด๋ฅผ, ์๋ค๋ฉด Null ๋ฐํ
if (op != null) {
Operation nextOp = ops.get(nextOpIdx);
/* <case 4> */
rv.lock();
if (!nextOp.isCancelled() && !future.isCanceling()) {
addOp(key, nextOp);
rv.setCurrentOpIdx(nextOpIdx);
}
rv.unlock();
} else {
/* <case 5> */
latch.countDown();
}
} else {
/* <case 6> */
// ...
if (nextIndex > 0) {
rv.addEachResult(nextIndex,
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
}
}
- <case 4>
- ํ์ฌ op๊ฐ complete๋์๊ณ nextOp๋ฅผ input queue์ ๋ฃ๊ธฐ ์ง์
future.cancel()์์ฒญ์ด ๋ค์ด์จ ์ํฉ์ด๋ค. - future.cancel ์ํ ์์ ์์ ์ canceling flag๋ฅผ true๋ก ๋๊ณ , nextOp cancel ์์ ๋์ค์๋ nextOp๋ฅผ input queue์ ์ถ๊ฐํ์ง ์๋๋ก lock์ ์ฌ์ฉํ๋ค.
- ํ์ฌ op๊ฐ complete๋์๊ณ nextOp๋ฅผ input queue์ ๋ฃ๊ธฐ ์ง์
- <case 5>
- ํ์ฌ op๊ฐ complete๋์๊ณ nextOp๊ฐ ์์ผ๋ฏ๋ก cancel ์์ฒญ์ ์คํจํ๋ค.
- <case 6>
- ํ์ฌ op๊ฐ ์คํจํ๊ณ nextOp๋ฅผ ์คํํ์ง ์๋๋ค๋ flag๋ฅผ ์ธ์์ผ ํ๋ฏ๋ก cancel ์์ฒญ์ ์คํจํ๋ค.
@oliviarla
์ฝ๋ฉํธ๋ง ๋ด์๋ ์ ์ดํด๊ฐ ๋์ง ์์ต๋๋ค. ์คํ๋ผ์ธ์ผ๋ก ๋ ์์ธํ ์ค๋ช ํด์ฃผ์ค ์ ์๋์?
@uhm0311 @jhpark816 ์ฝ๋ฉํธ ๋ด์ฉ์ ์ข๋ ์์ธํ ์์ ํ์์ต๋๋ค. ๊ฒํ ๋ถํ๋๋ฆฝ๋๋ค.
๊ธฐ์กด PR์ ์ด๋ค ๋ถ๋ถ์ด ์ด๋ป๊ฒ ๋ฌธ์ ๊ฐ ๋์๋์ง๋ถํฐ๊ฐ ์ดํด๊ฐ ์๋๋ ๋ถ๋ถ์ด๋ผ์, ๊ฑฐ๊ธฐ์๋ถํฐ ์ค๋ช ์ด ํ์ํฉ๋๋ค. ์ง๊ธ ์ฝ๋ฉํธ๋ TO-BE์ ๊ดํ ์ค๋ช ๋ง ์์ต๋๋ค. AS-IS์ ๋ฌธ์ ๊ฐ ๋ฌด์์ด์๋์ง, ์ด๋ ํ ์ด์ ๋ก ๋ฌธ์ ์๋์ง๋ถํฐ ์ฒ์ฒํ ์ค๋ช ํด์ฃผ์ธ์.
๊ธฐ์กด cancel ์ฒ๋ฆฌ ๊ณผ์ ์์ operation์ complete ์ฒ๋ฆฌ์ค์ผ ๋ future๋ก๋ถํฐ cancel์ ์์ฒญํ๊ณ nextOp๊ฐ ๋จ์์์๋ค๋ฉด ์ด๋ฅผ cancelํ๋ ๋์ ์คํจ ์๋ต์ ๋ณด๋ด๊ณ ์์์ต๋๋ค.
์คํจ ์๋ต์ด ๋ฌด์์ธ๊ฐ์?
- cancel() ๋ฉ์๋๊ฐ ์คํจํ๋ค๋ ์๋ต?
- ์บ์ ์ฐ์ฐ์ด ์คํจํ๋ค๋ ์๋ต?
- ๋ค๋ฅธ ๊ธฐํ ์๋ต?
@uhm0311 ์๋ณธ ์ฝ๋ฉํธ์ ๋ ์์ธํ ์์ฑํ์ต๋๋ค. cancel()๋ฉ์๋๊ฐ ์คํจํ๋ค๋ ์๋ต์ ์๋ฏธํ์ต๋๋ค.
@uhm0311 @jhpark816 ์ด์ ์ฝ๋ฉํธ๊ฐ ๋๋ฌด ๋ณต์กํ ๋ฐฉํฅ์ผ๋ก ์งํ๋์ด @jhpark816 ๋์ ํผ๋๋ฐฑ์ ๋ฐ๊ณ ์๋์ ๊ฐ์ด ๊ตฌ์์ ๋ณ๊ฒฝํ์์ต๋๋ค.
future.cancel()
- op.cancel์ด ์คํจํ๋ ๊ฒฝ์ฐ๋ ์ด๋ฏธ op๊ฐ complete ์ฒ๋ฆฌ๋๊ณ ์๋ ๊ฒฝ์ฐ ๋ฟ์ด๋ค. ๋ฐ๋ผ์ complete ์ฒ๋ฆฌ๋ op๋ ์คํตํ๊ณ ๋ค์ op๋ฅผ cancel ์๋ํ๋ค.
- ํ์ฌ ์ํ์ค์ธ op๋ถํฐ ์ํํ๋ฉฐ cancel์ด ์ฑ๊ณตํ์๋ค๋ฉด ์ฆ์ ์ฑ๊ณต ์๋ต์ ๋ฐํํ๋๋ก ํ๋ค.
- cancel๋ op๊ฐ ํ๋๋ผ๋ ์๋ค๋ฉด future๋ cancel๋ ์ํ์ด๋ค.
public boolean cancel(boolean ign) {
for (int i = currentOpIdx; i < ops.size(); i++) {
if (ops.get(i).cancel("by application.")) {
return true;
}
}
return false;
}
public boolean isCancelled() {
for (int i=0;i<ops.size();i++) {
if (ops.get(i).isCancelled()) {
return true;
}
}
return false;
}
callback.complete()
- nextOp๋ฅผ cancel ์ํค๋ ๋์์ addOpํ๋ ๊ฒฝ์ฐ cancel ์ํ์ด๋ฉด์ input queue์ ์ถ๊ฐ๋ ์ ์์ผ๋, write queue์์ ์ ์ธ๋๋ฏ๋ก ๋ฌธ์ ๊ฐ ๋ฐ์ํ์ง ์๋๋ค.
public void complete() {
if (rv.getOperationStatus().isSuccess()) {
Operation nextOp = rv.getNextOp(); // next op๊ฐ ์๋ค๋ฉด op ๊ฐ์ฒด๋ฅผ, ์๋ค๋ฉด Null ๋ฐํ
if (nextOp != null && !nextOp.isCancelled()) {
addOp(key, nextOp);
rv.setCurrentOpIdx(nextOpIdx);
} else {
latch.countDown();
}
} else {
// ...
if (nextIndex > 0) {
rv.addEachResult(nextIndex, new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
}
}
@uhm0311 @jhpark816 ์ด์ ๋ถํฐ ๋ค์ ๋ฆฌ๋ทฐํด์ฃผ์๋ฉด ๋ฉ๋๋ค.
๋ฆฌ๋ทฐ ๋ฐ์ํ์ต๋๋ค.
@uhm0311 @jhpark816 ํ์ฌ Future์ currentOpIdx, ๋ชจ๋ Operation์ ๋ด์ List ๋ฅผ ๋๋ค๋ณด๋ ํด๋น ๊ฐ์ callback์์๋ ์ฌ์ฉํ๊ณ future์์๋ ์ฌ์ฉํ์ฌ, ๋์์ฑ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ฌ์ง๊ฐ ๋์์ ธ์์ต๋๋ค. ๋ํ Future๋ ๊ฐ์ ๊ธฐ๋ค๋ฆฌ๋ ์ญํ ๋ง์ ๊ฐ๋ ๊ฒ์ด ๋จ์ํ๊ณ ์ถํ Future๋ฅผ ํตํฉํ ๋๋ ์ ์ฉํ ๊ฒ์ด๋ผ๋ ์๊ฐ์ด ๋ค์์ต๋๋ค.
๋ฐ๋ผ์ Future์ ํด๋น ๊ฐ๋ค์ ๋์ง ์๊ณ , ๊ธฐ์กด์ ์ํํ๋๋๋ก Future์๋ ํ์ฌ ์คํ์ค์ด๊ฑฐ๋ ์คํ ์๋ฃ๋ Operation๋ง์ ๋ด๋ List๋ฅผ ๊ฐ๋๋ก ๋ณ๊ฒฝํ๋ PR์ ๋ค์ ์ฌ๋ฆฌ๋ ค๊ณ ํฉ๋๋ค.
๊ธฐ์กด์์ ํ ์ํ๋ก ๊ตฌ์กฐ๊ฐ ๋ฐ๋๊ฒ ๋ ์์ธ์ด future.cancel() ํธ์ถ ์ nextOp๊น์ง cancel() ํด๋ณด์ง ์๊ณ ์คํจ ์ฒ๋ฆฌํ์ฌ, nextOp์ ์ฝ๊ฒ ์ ๊ทผํด cancel() ํธ์ถ์ ํ๊ธฐ ์ํจ์ด์์ต๋๋ค.
ํ์ง๋ง ์ด ๋ฌธ์ ๋ Future์์ ์ง์ nextOp์ ์ ๊ทผํ๋ ๋ฐฉ์ ๋์ , ํ๋์ Operation์ next Operation ๋ณ์๋ฅผ ๋ด์ Operation์ cancel ์์ฒญ์ด ๋ค์ด์์ ๋ next Operation cancel()์ ๋ด๋ถ์ ์ผ๋ก ํธ์ถํ ์ ์์ต๋๋ค. ๊ตฌํ์ ์๋ฃ๋ ์ํ์ด๊ณ , ์์ฒด ๋ฆฌ๋ทฐ ๊ฑฐ์ณ ๋ด์ผ ๋ค์ PR ์ฌ๋ฆฌ๊ฒ ์ต๋๋ค.