keep-core icon indicating copy to clipboard operation
keep-core copied to clipboard

New interface for EventSubscriptions

Open rargulati opened this issue 7 years ago • 4 comments

PR #474 introduces a way for us to unsubscribe from callbacks. Typically our callbacks are attached to long running processes (ie. Ethereum Watchers: goroutines which parse log files and signal back on a channel). Unfortunately, in the implementation, there's quite a bit of code duplication (generics!) and the unsubscribe isn't deferred.

@Shadowfiend proposed the following:

type <T>EventSubscription interface {
    func OnEvent(func (T)) EventSubscription // unsubscribe just runs the unsubscribe
    func OnEventContext(func (T), context *context.Context) // unsubscribe unsubscribes + handles context
    func Pipe(chan T) EventSubscription // unsubscribe does the unsubscribe + closes chan
    func PipeContext(chan T, context *context.Context) // unsubscribe unsubscribes + closes chan + handles context
}

type EventSubscription interface {
    func Unsubscribe()
}

type Chain interface {
    ...
    DKGResultPublication() DKGResultPublicationEventSubscription
}

subscription := chainRelay.DKGResultPublication().OnEvent(...)
defer subscription.Unsubscribe()

publishChan := make(chan *event.DKGResultPublication)
subscription := chainRelay.DKGResultPublication().Pipe(publishChan)
defer subscription.Unsubscribe()

This presents a more ideal way forward (less duplication, deferrable unsubscribes, etc). I attempted this with a bit of code generation, but gave up half way through (time). Let's pick this up again.

rargulati avatar Jan 08 '19 23:01 rargulati

During work on this card, we need to ensure our subscriptions are handled properly. We have some technical debt to pay in Ethereum integration layer - a lot of code duplication, some hairy mutex synchronization, and some complex conditions preventing us from writing to closed channels. It all needs to be sorted out during the work on this card.

It covers not only designing and implementing the mechanism but also porting all of the existing code to the new solution.

Some potential inspirations: https://blog.golang.org/pipelines

pdyraga avatar Mar 13 '19 20:03 pdyraga

  • [ ] PoC - 3 days
  • [ ] code generation - 2 days
  • [ ] concurrent test - 3 days
  • [ ] ethereum API update
  • [ ] local API update
  • [ ] review all subscriptions, and make sure we unsubscribe where needed - 5 days for 3 items above

~ 13 days

pdyraga avatar Mar 14 '19 15:03 pdyraga

Picked this guy up. Also, possible related issue.

Shadowfiend avatar May 22 '19 12:05 Shadowfiend

After 12+ hours of executing relay requests locally, I observed a fatal failure of the client:

18:54:58.095  INFO keep-relay: [member:1,channel:579d0,state:*entry.entrySubmissionState] transitioning to a new state at block: [1308] machine.go:132
18:54:58.103  INFO keep-entry: [member:3] waiting for block [1314] to submit submission.go:123
18:54:58.104  INFO keep-entry: [member:5] waiting for block [1320] to submit submission.go:123
18:54:58.104  INFO keep-entry: [member:1] waiting for block [1308] to submit submission.go:123
18:54:58.176  INFO keep-entry: [member:1] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832  INFO keep-entry: [member:5] leaving; relay entry submitted by other member submission.go:103
18:55:11.832  INFO keep-entry: [member:3] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832  INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] transitioned to new state machine.go:171
18:55:11.832  INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] reached final state at block: [1308] machine.go:99
panic: send on closed channel

goroutine 2866 [running]:
github.com/keep-network/keep-core/pkg/beacon/relay/entry.(*relayEntrySubmitter).submitRelayEntry.func1(0xc00067c6e0)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/beacon/relay/entry/submission.go:45 +0x3e
github.com/keep-network/keep-core/pkg/chain/ethereum.(*ethereumChain).OnSignatureSubmitted.func1(0xc0007085e0, 0xc00012a340, 0x40, 0x40, 0xc000708680, 0xc0007086a0, 0x522)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/ethereum/ethereum.go:236 +0x13f
github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted.func1(0xc0008c49c0, 0xc00078e150, 0xc0006701c0, 0x4b624c0, 0xc0005df280, 0x4a81158)
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2315 +0x80
created by github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted
	/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2305 +0x24f

Peers connected to that one lost him:

Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...

pdyraga avatar Oct 22 '19 07:10 pdyraga