@ -1,28 +1,23 @@
// @flow
// Unify the synchronization management for bridges with the redux store
// it handles automatically re-calling synchronize
// this is an even high abstraction than the bridge
import logger from 'logger'
import shuffle from 'lodash/shuffle'
import React , { Component } from 'react'
import priorityQueue from 'async/priorityQueue'
import { connect } from 'react-redux'
import type { Account } from '@ledgerhq/live-common/lib/types'
import { createStructuredSelector } from 'reselect'
import { updateAccountWithUpdater } from 'actions/accounts'
import { setAccountSyncState , setAccountPullMoreState } from 'actions/bridgeSync'
import {
bridgeSyncSelector ,
syncStateLocalSelector ,
pullMoreStateLocalSelector ,
} from 'reducers/bridgeSync'
import { setAccountSyncState } from 'actions/bridgeSync'
import { bridgeSyncSelector , syncStateLocalSelector } from 'reducers/bridgeSync'
import type { BridgeSyncState } from 'reducers/bridgeSync'
import { accountsSelector } from 'reducers/accounts'
import { SYNC_BOOT_DELAY , SYNC_INTERVAL } from 'config/constants'
import { SYNC_BOOT_DELAY , SYNC_ALL_INTERVAL } from 'config/constants'
import { getBridgeForCurrency } from '.'
// Unify the synchronization management for bridges with the redux store
// it handles automatically re-calling synchronize
// this is an even high abstraction than the bridge
// $FlowFixMe can't wait flow implement createContext
const BridgeSyncContext = React . createContext ( ( ) => { } )
type BridgeSyncProviderProps = {
children : * ,
}
@ -32,7 +27,6 @@ type BridgeSyncProviderOwnProps = BridgeSyncProviderProps & {
accounts : Account [ ] ,
updateAccountWithUpdater : ( string , ( Account ) => Account ) => void ,
setAccountSyncState : ( string , AsyncState ) => * ,
setAccountPullMoreState : ( string , AsyncState ) => * ,
}
type AsyncState = {
@ -40,15 +34,15 @@ type AsyncState = {
error : ? Error ,
}
type BridgeSync = {
synchronize : ( accountId : string ) => Promise < void > ,
export type BehaviorAction =
| { type : 'BACKGROUND_TICK' }
| { type : 'SET_SKIP_UNDER_PRIORITY' , priority : number }
| { type : 'SYNC_ONE_ACCOUNT' , accountId : string , priority : number }
| { type : 'SYNC_ALL_ACCOUNTS' , priority : number }
// sync for all accounts (if there were errors it stopped)
syncAll : ( ) => { } ,
export type Sync = ( action : BehaviorAction ) => void
//
pullMoreOperations : ( accountId : string , count : number ) => Promise < void > ,
}
const BridgeSyncContext = React . createContext ( ( _ : BehaviorAction ) => { } )
const mapStateToProps = createStructuredSelector ( {
accounts : accountsSelector ,
@ -58,110 +52,104 @@ const mapStateToProps = createStructuredSelector({
const actions = {
updateAccountWithUpdater ,
setAccountSyncState ,
setAccountPullMoreState ,
}
class Provider extends Component < BridgeSyncProviderOwnProps , Bridge Sync> {
class Provider extends Component < BridgeSyncProviderOwnProps , Sync > {
constructor ( ) {
super ( )
const syncPromises = { }
const syncSubs = { }
const pullMorePromises = { }
const getSyncState = accountId => syncStateLocalSelector ( this . props . bridgeSync , { accountId } )
const getPullMoreOperationsState = accountId =>
pullMoreStateLocalSelector ( this . props . bridgeSync , { accountId } )
const synchronize = ( accountId : string , next : ( ) => void ) => {
const state = syncStateLocalSelector ( this . props . bridgeSync , { accountId } )
if ( state . pending ) {
next ( )
return
}
const account = this . props . accounts . find ( a => a . id === accountId )
if ( ! account ) throw new Error ( 'account not found' )
const getAccountById = accountId => {
const a = this . props . accounts . find ( a => a . id === accountId )
if ( ! a ) throw new Error ( 'account not found' )
return a
}
const bridge = getBridgeForCurrency ( account . currency )
const getBridgeForAccountId = accountId =>
getBridgeForCurrency ( getAccountById ( accountId ) . currency )
this . props . setAccountSyncState ( accountId , { pending : true , error : null } )
const pullMoreOperations = ( accountId , count ) => {
const state = getPullMoreOperationsState ( accountId )
if ( state . pending ) {
return (
pullMorePromises [ accountId ] || Promise . reject ( new Error ( 'no pullMore started. (bug)' ) )
)
}
this . props . setAccountPullMoreState ( accountId , { pending : true , error : null } )
const bridge = getBridgeForAccountId ( accountId )
const p = bridge . pullMoreOperations ( getAccountById ( accountId ) , count ) . then (
accountUpdater => {
this . props . setAccountPullMoreState ( accountId , {
pending : false ,
error : null ,
} )
// TODO use Subscription to unsubscribe at relevant time
bridge . synchronize ( account ) . subscribe ( {
next : accountUpdater => {
this . props . updateAccountWithUpdater ( accountId , accountUpdater )
} ,
error => {
this . props . setAccountPullMoreState ( accountId , {
pending : false ,
error ,
} )
complete : ( ) => {
this . props . setAccountSyncState ( accountId , { pending : false , error : null } )
next ( )
} ,
)
pullMorePromises [ accountId ] = p
return p
error : error => {
this . props . setAccountSyncState ( accountId , { pending : false , error } )
next ( )
} ,
} )
}
const synchronize = accountId => {
const state = getSyncState ( accountId )
if ( state . pending ) {
return syncPromises [ accountId ] || Promise . reject ( new Error ( 'no sync started. (bug)' ) )
}
const syncQueue = priorityQueue ( synchronize , 2 )
this . props . setAccountSyncState ( accountId , { pending : true , error : null } )
const bridge = getBridgeForAccountId ( accountId )
const p = new Promise ( ( resolve , reject ) => {
const subscription = bridge . synchronize ( getAccountById ( accountId ) , {
next : accountUpdater => {
this . props . updateAccountWithUpdater ( accountId , accountUpdater )
} ,
complete : ( ) => {
this . props . setAccountSyncState ( accountId , { pending : false , error : null } )
resolve ( )
} ,
error : error => {
this . props . setAccountSyncState ( accountId , { pending : false , error } )
reject ( error )
} ,
} )
syncSubs [ accountId ] = subscription
} )
syncPromises [ accountId ] = p
return p
let skipUnderPriority : number = - 1
const schedule = ( ids : string [ ] , priority : number ) => {
if ( priority < skipUnderPriority ) return
// by convention we remove concurrent tasks with same priority
syncQueue . remove ( o => priority === o . priority )
syncQueue . push ( ids , - priority )
}
const syncAll = ( ) => Promise . all ( this . props . accounts . map ( account => synchronize ( account . id ) ) )
// don't always sync in same order to avoid potential "never account never reached"
const shuffledAccountIds = ( ) => shuffle ( this . props . accounts . map ( a => a . id ) )
const handlers = {
BACKGROUND_TICK : ( ) => {
if ( syncQueue . idle ( ) ) {
schedule ( shuffledAccountIds ( ) , - 1 )
}
} ,
SET_SKIP_UNDER_PRIORITY : ( { priority } ) => {
if ( priority === skipUnderPriority ) return
skipUnderPriority = priority
syncQueue . remove ( ( { priority } ) => priority < skipUnderPriority )
} ,
SYNC_ALL_ACCOUNTS : ( { priority } ) => {
schedule ( shuffledAccountIds ( ) , priority )
} ,
SYNC_ONE_ACCOUNT : ( { accountId , priority } ) => {
schedule ( [ accountId ] , priority )
} ,
}
this . api = {
synchronize ,
syncAll ,
pullMoreOperations ,
const sync = ( action : BehaviorAction ) => {
const handler = handlers [ action . type ]
if ( handler ) {
// $FlowFixMe
handler ( action )
} else {
logger . warn ( 'BridgeSyncContext unsupported action' , action )
}
}
this . api = sync
}
componentDidMount ( ) {
const syncLoop = async ( ) => {
try {
await this . api . syncAll ( )
} catch ( e ) {
logger . error ( 'sync issues' , e )
}
setTimeout ( syncLoop , SYNC_INTERVAL )
this . api ( { type : 'BACKGROUND_TICK' } )
this . syncTimeout = setTimeout ( syncLoop , SYNC_ALL_INTERVAL )
}
setTimeout ( syncLoop , SYNC_BOOT_DELAY )
this . syncTimeout = setTimeout ( syncLoop , SYNC_BOOT_DELAY )
}
// TODO we might want to call sync straight away when new accounts got added (it will happen every 10s anyway)
componentWillUnmount ( ) {
clearTimeout ( this . syncTimeout )
}
api : BridgeSync
syncTimeout : *
api : Sync
render ( ) {
return (