New interface for EventSubscriptions
PR #474 introduces a way for us to unsubscribe from callbacks. Typically our callbacks are attached to long running processes (ie. Ethereum Watchers: goroutines which parse log files and signal back on a channel). Unfortunately, in the implementation, there's quite a bit of code duplication (generics!) and the unsubscribe isn't deferred.
@Shadowfiend proposed the following:
type <T>EventSubscription interface {
func OnEvent(func (T)) EventSubscription // unsubscribe just runs the unsubscribe
func OnEventContext(func (T), context *context.Context) // unsubscribe unsubscribes + handles context
func Pipe(chan T) EventSubscription // unsubscribe does the unsubscribe + closes chan
func PipeContext(chan T, context *context.Context) // unsubscribe unsubscribes + closes chan + handles context
}
type EventSubscription interface {
func Unsubscribe()
}
type Chain interface {
...
DKGResultPublication() DKGResultPublicationEventSubscription
}
subscription := chainRelay.DKGResultPublication().OnEvent(...)
defer subscription.Unsubscribe()
publishChan := make(chan *event.DKGResultPublication)
subscription := chainRelay.DKGResultPublication().Pipe(publishChan)
defer subscription.Unsubscribe()
This presents a more ideal way forward (less duplication, deferrable unsubscribes, etc). I attempted this with a bit of code generation, but gave up half way through (time). Let's pick this up again.
During work on this card, we need to ensure our subscriptions are handled properly. We have some technical debt to pay in Ethereum integration layer - a lot of code duplication, some hairy mutex synchronization, and some complex conditions preventing us from writing to closed channels. It all needs to be sorted out during the work on this card.
It covers not only designing and implementing the mechanism but also porting all of the existing code to the new solution.
Some potential inspirations: https://blog.golang.org/pipelines
- [ ] PoC - 3 days
- [ ] code generation - 2 days
- [ ] concurrent test - 3 days
- [ ] ethereum API update
- [ ] local API update
- [ ] review all subscriptions, and make sure we unsubscribe where needed - 5 days for 3 items above
~ 13 days
Picked this guy up. Also, possible related issue.
After 12+ hours of executing relay requests locally, I observed a fatal failure of the client:
18:54:58.095 INFO keep-relay: [member:1,channel:579d0,state:*entry.entrySubmissionState] transitioning to a new state at block: [1308] machine.go:132
18:54:58.103 INFO keep-entry: [member:3] waiting for block [1314] to submit submission.go:123
18:54:58.104 INFO keep-entry: [member:5] waiting for block [1320] to submit submission.go:123
18:54:58.104 INFO keep-entry: [member:1] waiting for block [1308] to submit submission.go:123
18:54:58.176 INFO keep-entry: [member:1] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832 INFO keep-entry: [member:5] leaving; relay entry submitted by other member submission.go:103
18:55:11.832 INFO keep-entry: [member:3] submitting relay entry [61152635787414383325787830737202178302596509978190241205488548435566954490300] on behalf of group [[156 40 248 74 9 203 234 18 81 157 111 165 216 64 28 223 29 28 187 194 135 137 156 29 108 226 13 127 1 106 234 9 13 136 248 230 75 57 11 152 162 145 250 234 201 79 250 223 72 219 137 104 251 238 195 5 223 152 181 28 75 37 230 234]] submission.go:83
18:55:11.832 INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] transitioned to new state machine.go:171
18:55:11.832 INFO keep-relay: [member:5,channel:579d0,state:*entry.entrySubmissionState] reached final state at block: [1308] machine.go:99
panic: send on closed channel
goroutine 2866 [running]:
github.com/keep-network/keep-core/pkg/beacon/relay/entry.(*relayEntrySubmitter).submitRelayEntry.func1(0xc00067c6e0)
/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/beacon/relay/entry/submission.go:45 +0x3e
github.com/keep-network/keep-core/pkg/chain/ethereum.(*ethereumChain).OnSignatureSubmitted.func1(0xc0007085e0, 0xc00012a340, 0x40, 0x40, 0xc000708680, 0xc0007086a0, 0x522)
/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/ethereum/ethereum.go:236 +0x13f
github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted.func1(0xc0008c49c0, 0xc00078e150, 0xc0006701c0, 0x4b624c0, 0xc0005df280, 0x4a81158)
/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2315 +0x80
created by github.com/keep-network/keep-core/pkg/chain/gen/contract.(*KeepRandomBeaconOperator).WatchSignatureSubmitted
/Users/piotr/go/src/github.com/keep-network/keep-core/pkg/chain/gen/contract/KeepRandomBeaconOperator.go:2305 +0x24f
Peers connected to that one lost him:
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...
Connection to bootstrap peer [16Uiu2HAm82kFx5PMHWUARfKhPhg9gQVdasiySjZaPPsNUjuE59TV] failed. Retrying...