585 lines
24 KiB
Go
585 lines
24 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/ledgerwatch/erigon-lib/common/dir"
|
|
libstate "github.com/ledgerwatch/erigon-lib/state"
|
|
"github.com/ledgerwatch/erigon/eth/ethconfig"
|
|
"github.com/ledgerwatch/erigon/rpc/rpccfg"
|
|
|
|
"github.com/ledgerwatch/erigon/turbo/debug"
|
|
"github.com/ledgerwatch/erigon/turbo/logging"
|
|
"github.com/wmitsuda/otterscan/cmd/otter/cli/httpcfg"
|
|
"github.com/wmitsuda/otterscan/cmd/otter/health"
|
|
"github.com/wmitsuda/otterscan/cmd/otter/rpcservices"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/direct"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
|
|
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
|
"github.com/ledgerwatch/erigon-lib/kv/remotedb"
|
|
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
|
|
"github.com/ledgerwatch/erigon/cmd/utils"
|
|
"github.com/ledgerwatch/erigon/common/paths"
|
|
"github.com/ledgerwatch/erigon/core/rawdb"
|
|
"github.com/ledgerwatch/erigon/node"
|
|
"github.com/ledgerwatch/erigon/node/nodecfg"
|
|
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
|
|
"github.com/ledgerwatch/erigon/params"
|
|
"github.com/ledgerwatch/erigon/rpc"
|
|
"github.com/ledgerwatch/erigon/turbo/rpchelper"
|
|
"github.com/ledgerwatch/erigon/turbo/services"
|
|
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
|
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"github.com/spf13/cobra"
|
|
"golang.org/x/sync/semaphore"
|
|
"google.golang.org/grpc"
|
|
grpcHealth "google.golang.org/grpc/health"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
)
|
|
|
|
var rootCmd = &cobra.Command{
|
|
Use: "otter",
|
|
Short: "otterscan is a chain explorer that can also host a custom JSON RPC server that connects to an Erigon node",
|
|
}
|
|
|
|
func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
|
|
utils.CobraFlags(rootCmd, debug.Flags, utils.MetricFlags, logging.Flags)
|
|
|
|
cfg := &httpcfg.HttpCfg{}
|
|
cfg.Enabled = true
|
|
cfg.StateCache = kvcache.DefaultCoherentConfig
|
|
|
|
rootCmd.PersistentFlags().StringVar(&cfg.PrivateApiAddr, "private.api.addr", "127.0.0.1:9090", "private api network address, for example: 127.0.0.1:9090")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.DataDir, "datadir", "", "path to Erigon working directory")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.HttpListenAddress, "http.addr", nodecfg.DefaultHTTPHost, "HTTP-RPC server listening interface")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.TLSCertfile, "tls.cert", "", "certificate for client side TLS handshake")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.TLSKeyFile, "tls.key", "", "key file for client side TLS handshake")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.TLSCACert, "tls.cacert", "", "CA certificate for client side TLS handshake")
|
|
rootCmd.PersistentFlags().IntVar(&cfg.HttpPort, "http.port", nodecfg.DefaultHTTPPort, "HTTP-RPC server listening port")
|
|
rootCmd.PersistentFlags().StringSliceVar(&cfg.HttpCORSDomain, "http.corsdomain", []string{}, "Comma separated list of domains from which to accept cross origin requests (browser enforced)")
|
|
rootCmd.PersistentFlags().StringSliceVar(&cfg.HttpVirtualHost, "http.vhosts", nodecfg.DefaultConfig.HTTPVirtualHosts, "Comma separated list of virtual hostnames from which to accept requests (server enforced). Accepts '*' wildcard.")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.HttpCompression, "http.compression", true, "Disable http compression")
|
|
rootCmd.PersistentFlags().StringSliceVar(&cfg.API, "http.api", []string{"eth", "erigon"}, "API's offered over the HTTP-RPC interface: eth,erigon,web3,net,debug,trace,txpool,db. Supported methods: https://github.com/ledgerwatch/erigon/tree/devel/cmd/rpcdaemon")
|
|
rootCmd.PersistentFlags().Uint64Var(&cfg.Gascap, "rpc.gascap", 50000000, "Sets a cap on gas that can be used in eth_call/estimateGas")
|
|
rootCmd.PersistentFlags().Uint64Var(&cfg.MaxTraces, "trace.maxtraces", 200, "Sets a limit on traces that can be returned in trace_filter")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, "rpc.accessList", "", "Specify granular (method-by-method) API allowlist")
|
|
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
|
|
rootCmd.PersistentFlags().IntVar(&cfg.DBReadConcurrency, utils.DBReadConcurrencyFlag.Name, utils.DBReadConcurrencyFlag.Value, utils.DBReadConcurrencyFlag.Usage)
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "", "txpool api network address, for example: 127.0.0.1:9090 (default: use value of --private.api.addr)")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.Sync.UseSnapshots, "snapshot", true, utils.SnapshotFlag.Usage)
|
|
rootCmd.PersistentFlags().IntVar(&cfg.StateCache.KeysLimit, "state.cache", kvcache.DefaultCoherentConfig.KeysLimit, "Amount of keys to store in StateCache (enabled if no --datadir set). Set 0 to disable StateCache. 1_000_000 keys ~ equal to 2Gb RAM (maybe we will add RAM accounting in future versions).")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.GRPCServerEnabled, "grpc", false, "Enable GRPC server")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.GRPCListenAddress, "grpc.addr", nodecfg.DefaultGRPCHost, "GRPC server listening interface")
|
|
rootCmd.PersistentFlags().IntVar(&cfg.GRPCPort, "grpc.port", nodecfg.DefaultGRPCPort, "GRPC server listening port")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.GRPCHealthCheckEnabled, "grpc.healthcheck", false, "Enable GRPC health check")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.TraceRequests, utils.HTTPTraceFlag.Name, false, "Trace HTTP requests with INFO level")
|
|
rootCmd.PersistentFlags().DurationVar(&cfg.HTTPTimeouts.ReadTimeout, "http.timeouts.read", rpccfg.DefaultHTTPTimeouts.ReadTimeout, "Maximum duration for reading the entire request, including the body.")
|
|
rootCmd.PersistentFlags().DurationVar(&cfg.HTTPTimeouts.WriteTimeout, "http.timeouts.write", rpccfg.DefaultHTTPTimeouts.WriteTimeout, "Maximum duration before timing out writes of the response. It is reset whenever a new request's header is read")
|
|
rootCmd.PersistentFlags().DurationVar(&cfg.HTTPTimeouts.IdleTimeout, "http.timeouts.idle", rpccfg.DefaultHTTPTimeouts.IdleTimeout, "Maximum amount of time to wait for the next request when keep-alives are enabled. If http.timeouts.idle is zero, the value of http.timeouts.read is used")
|
|
rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.")
|
|
|
|
if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := rootCmd.MarkPersistentFlagDirname("datadir"); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// otterscan server setting
|
|
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.OtsServerDisable, "ots.server.addr", false, "disable ots server to run rpc daemon only")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.OtsStaticDir, "ots.static.dir", "./dist", "dir to serve static files from")
|
|
rootCmd.PersistentFlags().BoolVar(&cfg.DisableRpcDaemon, "disable.rpc.daemon", false, "dont run rpc daemon, for use when specifying external rtpc daemon")
|
|
|
|
rootCmd.PersistentFlags().StringVar(&cfg.OtsBeaconApiUrl, "ots.beaconapi.url", "http://localhost:3500", "where the website will make request for beacon api")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.OtsRpcDaemonUrl, "ots.rpcdaemon.url", "/rpc", "where the website will make request for beacon api")
|
|
rootCmd.PersistentFlags().StringVar(&cfg.OtsAssetUrl, "ots.asset.url", "", "where website will make request for assets served by the OTS server")
|
|
|
|
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
|
|
if err := debug.SetupCobra(cmd); err != nil {
|
|
return err
|
|
}
|
|
cfg.WithDatadir = cfg.DataDir != ""
|
|
if cfg.WithDatadir {
|
|
if cfg.DataDir == "" {
|
|
cfg.DataDir = paths.DefaultDataDir()
|
|
}
|
|
cfg.Dirs = datadir.New(cfg.DataDir)
|
|
}
|
|
if cfg.TxPoolApiAddr == "" {
|
|
cfg.TxPoolApiAddr = cfg.PrivateApiAddr
|
|
}
|
|
return nil
|
|
}
|
|
rootCmd.PersistentPostRunE = func(cmd *cobra.Command, args []string) error {
|
|
debug.Exit()
|
|
return nil
|
|
}
|
|
|
|
cfg.StateCache.MetricsLabel = "rpc"
|
|
|
|
return rootCmd, cfg
|
|
}
|
|
|
|
type StateChangesClient interface {
|
|
StateChanges(ctx context.Context, in *remote.StateChangeRequest, opts ...grpc.CallOption) (remote.KV_StateChangesClient, error)
|
|
}
|
|
|
|
func subscribeToStateChangesLoop(ctx context.Context, client StateChangesClient, cache kvcache.Cache) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
if err := subscribeToStateChanges(ctx, client, cache); err != nil {
|
|
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
|
|
time.Sleep(3 * time.Second)
|
|
continue
|
|
}
|
|
log.Warn("[txpool.handleStateChanges]", "err", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func subscribeToStateChanges(ctx context.Context, client StateChangesClient, cache kvcache.Cache) error {
|
|
streamCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
stream, err := client.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: true, WithTransactions: false}, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for req, err := stream.Recv(); ; req, err = stream.Recv() {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if req == nil {
|
|
return nil
|
|
}
|
|
|
|
cache.OnNewBlock(req)
|
|
}
|
|
}
|
|
|
|
func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {
|
|
// DB schema version compatibility check
|
|
var version []byte
|
|
var compatErr error
|
|
var compatTx kv.Tx
|
|
if compatTx, compatErr = db.BeginRo(ctx); compatErr != nil {
|
|
return fmt.Errorf("open Ro Tx for DB schema compability check: %w", compatErr)
|
|
}
|
|
defer compatTx.Rollback()
|
|
if version, compatErr = compatTx.GetOne(kv.DatabaseInfo, kv.DBSchemaVersionKey); compatErr != nil {
|
|
return fmt.Errorf("read version for DB schema compability check: %w", compatErr)
|
|
}
|
|
if len(version) != 12 {
|
|
return fmt.Errorf("database does not have major schema version. upgrade and restart Erigon core")
|
|
}
|
|
major := binary.BigEndian.Uint32(version)
|
|
minor := binary.BigEndian.Uint32(version[4:])
|
|
patch := binary.BigEndian.Uint32(version[8:])
|
|
var compatible bool
|
|
dbSchemaVersion := &kv.DBSchemaVersion
|
|
if major != dbSchemaVersion.Major {
|
|
compatible = false
|
|
} else if minor != dbSchemaVersion.Minor {
|
|
compatible = false
|
|
} else {
|
|
compatible = true
|
|
}
|
|
if !compatible {
|
|
return fmt.Errorf("incompatible DB Schema versions: reader %d.%d.%d, database %d.%d.%d",
|
|
dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch,
|
|
major, minor, patch)
|
|
}
|
|
log.Info("DB schemas compatible", "reader", fmt.Sprintf("%d.%d.%d", dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch),
|
|
"database", fmt.Sprintf("%d.%d.%d", major, minor, patch))
|
|
return nil
|
|
}
|
|
|
|
func EmbeddedServices(ctx context.Context,
|
|
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
|
|
blockReader services.FullBlockReader, snapshots *snapshotsync.RoSnapshots, agg *libstate.Aggregator22,
|
|
ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer,
|
|
) (eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, ff *rpchelper.Filters, err error) {
|
|
if stateCacheCfg.KeysLimit > 0 {
|
|
stateCache = kvcache.NewDummy()
|
|
// notification about new blocks (state stream) doesn't work now inside erigon - because
|
|
// erigon does send this stream to privateAPI (erigon with enabled rpc, still have enabled privateAPI).
|
|
// without this state stream kvcache can't work and only slow-down things
|
|
//
|
|
//stateCache = kvcache.New(stateCacheCfg)
|
|
} else {
|
|
stateCache = kvcache.NewDummy()
|
|
}
|
|
kvRPC := remotedbserver.NewKvServer(ctx, erigonDB, snapshots, agg)
|
|
stateDiffClient := direct.NewStateDiffClientDirect(kvRPC)
|
|
subscribeToStateChangesLoop(ctx, stateDiffClient, stateCache)
|
|
|
|
directClient := direct.NewEthBackendClientDirect(ethBackendServer)
|
|
|
|
eth = rpcservices.NewRemoteBackend(directClient, erigonDB, blockReader)
|
|
txPool = direct.NewTxPoolClient(txPoolServer)
|
|
mining = direct.NewMiningClient(miningServer)
|
|
ff = rpchelper.New(ctx, eth, txPool, mining, func() {})
|
|
|
|
return
|
|
}
|
|
|
|
// RemoteServices - use when RPCDaemon run as independent process. Still it can use --datadir flag to enable
|
|
// `cfg.WithDatadir` (mode when it on 1 machine with Erigon)
|
|
func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
|
|
db kv.RoDB, borDb kv.RoDB,
|
|
eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
|
|
stateCache kvcache.Cache, blockReader services.FullBlockReader,
|
|
ff *rpchelper.Filters, agg *libstate.Aggregator22, err error) {
|
|
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
|
|
}
|
|
|
|
// Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not ""
|
|
// If PrivateApiAddr is checked first, the Chaindata option will never work
|
|
if cfg.WithDatadir {
|
|
dir.MustExist(cfg.Dirs.SnapHistory)
|
|
var rwKv kv.RwDB
|
|
log.Trace("Creating chain db", "path", cfg.Dirs.Chaindata)
|
|
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
|
|
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Readonly().Open()
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
|
}
|
|
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
|
|
}
|
|
db = rwKv
|
|
stateCache = kvcache.NewDummy()
|
|
blockReader = snapshotsync.NewBlockReader()
|
|
|
|
// bor (consensus) specific db
|
|
var borKv kv.RoDB
|
|
borDbPath := filepath.Join(cfg.DataDir, "bor")
|
|
{
|
|
// ensure db exist
|
|
tmpDb, err := kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Open()
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
|
}
|
|
tmpDb.Close()
|
|
}
|
|
log.Trace("Creating consensus db", "path", borDbPath)
|
|
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Readonly().Open()
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
|
}
|
|
// Skip the compatibility check, until we have a schema in erigon-lib
|
|
borDb = borKv
|
|
} else {
|
|
if cfg.StateCache.KeysLimit > 0 {
|
|
stateCache = kvcache.NewDummy()
|
|
//stateCache = kvcache.New(cfg.StateCache)
|
|
} else {
|
|
stateCache = kvcache.NewDummy()
|
|
}
|
|
log.Info("if you run RPCDaemon on same machine with Erigon add --datadir option")
|
|
}
|
|
|
|
if db != nil {
|
|
var cc *params.ChainConfig
|
|
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
|
genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if genesisBlock == nil {
|
|
return fmt.Errorf("genesis not found in DB. Likely Erigon was never started on this datadir")
|
|
}
|
|
cc, err = rawdb.ReadChainConfig(tx, genesisBlock.Hash())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cfg.Snap.Enabled, err = snap.Enabled(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
|
|
}
|
|
if cc == nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
|
|
}
|
|
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
|
|
}
|
|
|
|
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
|
|
}
|
|
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
|
|
}
|
|
|
|
kvClient := remote.NewKVClient(conn)
|
|
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, kvClient).Open()
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
|
|
}
|
|
|
|
subscribeToStateChangesLoop(ctx, kvClient, stateCache)
|
|
|
|
onNewSnapshot := func() {}
|
|
if cfg.WithDatadir {
|
|
if cfg.Snap.Enabled {
|
|
|
|
allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
|
|
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
|
|
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `kvClient.Snapshots` after establish grpc connection
|
|
allSnapshots.OptimisticReopenWithDB(db)
|
|
allSnapshots.LogStat()
|
|
|
|
if agg, err = libstate.NewAggregator22(cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
|
}
|
|
if err = agg.ReopenFiles(); err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
|
|
}
|
|
|
|
db.View(context.Background(), func(tx kv.Tx) error {
|
|
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
|
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
|
return histBlockNumProgress
|
|
})
|
|
return nil
|
|
})
|
|
|
|
onNewSnapshot = func() {
|
|
go func() { // don't block events processing by network communication
|
|
reply, err := kvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
log.Warn("[Snapshots] reopen", "err", err)
|
|
return
|
|
}
|
|
if err := allSnapshots.ReopenList(reply.BlockFiles, true); err != nil {
|
|
log.Error("[Snapshots] reopen", "err", err)
|
|
} else {
|
|
allSnapshots.LogStat()
|
|
}
|
|
|
|
if err = agg.ReopenFiles(); err != nil {
|
|
log.Error("[Snapshots] reopen", "err", err)
|
|
} else {
|
|
db.View(context.Background(), func(tx kv.Tx) error {
|
|
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
|
|
_, histBlockNumProgress, _ := rawdb.TxNums.FindBlockNum(tx, endTxNumMinimax)
|
|
return histBlockNumProgress
|
|
})
|
|
return nil
|
|
})
|
|
}
|
|
}()
|
|
}
|
|
onNewSnapshot()
|
|
// TODO: how to don't block startup on remote RPCDaemon?
|
|
// txNums = exec22.TxNumsFromDB(allSnapshots, db)
|
|
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
|
|
} else {
|
|
log.Info("Use --snapshots=false")
|
|
}
|
|
}
|
|
|
|
if !cfg.WithDatadir {
|
|
blockReader = snapshotsync.NewRemoteBlockReader(remote.NewETHBACKENDClient(conn))
|
|
}
|
|
remoteEth := rpcservices.NewRemoteBackend(remote.NewETHBACKENDClient(conn), db, blockReader)
|
|
blockReader = remoteEth
|
|
|
|
txpoolConn := conn
|
|
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
|
|
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
|
|
}
|
|
}
|
|
|
|
mining = txpool.NewMiningClient(txpoolConn)
|
|
miningService := rpcservices.NewMiningService(mining)
|
|
txPool = txpool.NewTxpoolClient(txpoolConn)
|
|
txPoolService := rpcservices.NewTxPoolService(txPool)
|
|
if db == nil {
|
|
db = remoteKv
|
|
}
|
|
eth = remoteEth
|
|
go func() {
|
|
if !remoteKv.EnsureVersionCompatibility() {
|
|
rootCancel()
|
|
}
|
|
if !remoteEth.EnsureVersionCompatibility() {
|
|
rootCancel()
|
|
}
|
|
if mining != nil && !miningService.EnsureVersionCompatibility() {
|
|
rootCancel()
|
|
}
|
|
if !txPoolService.EnsureVersionCompatibility() {
|
|
rootCancel()
|
|
}
|
|
}()
|
|
|
|
ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot)
|
|
return db, borDb, eth, txPool, mining, stateCache, blockReader, ff, agg, err
|
|
}
|
|
|
|
func StartRpcServer(ctx context.Context, r chi.Router, cfg httpcfg.HttpCfg, rpcAPI []rpc.API) error {
|
|
if cfg.Enabled {
|
|
return startServer(ctx, r, cfg, rpcAPI)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func startServer(ctx context.Context, r chi.Router, cfg httpcfg.HttpCfg, rpcAPI []rpc.API) error {
|
|
// register apis and create handler stack
|
|
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
|
|
|
|
log.Trace("TraceRequests = %t\n", cfg.TraceRequests)
|
|
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)
|
|
|
|
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
srv.SetAllowList(allowListForRPC)
|
|
|
|
var defaultAPIList []rpc.API
|
|
|
|
for _, api := range rpcAPI {
|
|
if api.Namespace != "engine" {
|
|
defaultAPIList = append(defaultAPIList, api)
|
|
}
|
|
}
|
|
|
|
var apiFlags []string
|
|
for _, flag := range cfg.API {
|
|
if flag != "engine" {
|
|
apiFlags = append(apiFlags, flag)
|
|
}
|
|
}
|
|
|
|
if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, srv, false); err != nil {
|
|
return fmt.Errorf("could not start register RPC apis: %w", err)
|
|
}
|
|
|
|
httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression)
|
|
var wsHandler http.Handler
|
|
if cfg.WebsocketEnabled {
|
|
wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression)
|
|
}
|
|
|
|
apiHandler, err := createHandler(cfg, defaultAPIList, httpHandler, wsHandler, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.Mount("/rpc", apiHandler)
|
|
listener, _, err := node.StartHTTPEndpoint(httpEndpoint, cfg.HTTPTimeouts, r)
|
|
if err != nil {
|
|
return fmt.Errorf("could not start RPC api: %w", err)
|
|
}
|
|
info := &[]interface{}{"url", httpEndpoint, "ws", cfg.WebsocketEnabled,
|
|
"ws.compression", cfg.WebsocketCompression, "grpc", cfg.GRPCServerEnabled}
|
|
if cfg.GRPCServerEnabled {
|
|
startGrpcServer(ctx, info, cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("could not start GRPC api: %w", err)
|
|
}
|
|
}
|
|
log.Info("HTTP endpoint opened", *info...)
|
|
defer func() {
|
|
srv.Stop()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = listener.Shutdown(shutdownCtx)
|
|
log.Info("HTTP endpoint closed", "url", httpEndpoint)
|
|
}()
|
|
<-ctx.Done()
|
|
log.Info("Exiting...")
|
|
return nil
|
|
}
|
|
|
|
func startGrpcServer(ctx context.Context, info *[]any, cfg httpcfg.HttpCfg) (err error) {
|
|
var (
|
|
healthServer *grpcHealth.Server
|
|
grpcServer *grpc.Server
|
|
grpcListener net.Listener
|
|
grpcEndpoint string
|
|
)
|
|
grpcEndpoint = fmt.Sprintf("%s:%d", cfg.GRPCListenAddress, cfg.GRPCPort)
|
|
if grpcListener, err = net.Listen("tcp", grpcEndpoint); err != nil {
|
|
return fmt.Errorf("could not start GRPC listener: %w", err)
|
|
}
|
|
grpcServer = grpc.NewServer()
|
|
if cfg.GRPCHealthCheckEnabled {
|
|
healthServer = grpcHealth.NewServer()
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
|
}
|
|
go grpcServer.Serve(grpcListener)
|
|
*info = append(*info, "grpc.port", cfg.GRPCPort)
|
|
|
|
defer func() {
|
|
if cfg.GRPCHealthCheckEnabled {
|
|
healthServer.Shutdown()
|
|
}
|
|
grpcServer.GracefulStop()
|
|
_ = grpcListener.Close()
|
|
log.Info("GRPC endpoint closed", "url", grpcEndpoint)
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Handler, wsHandler http.Handler, jwtSecret []byte) (http.Handler, error) {
|
|
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// adding a healthcheck here
|
|
if health.ProcessHealthcheckIfNeeded(w, r, apiList) {
|
|
return
|
|
}
|
|
if cfg.WebsocketEnabled && wsHandler != nil && isWebsocket(r) {
|
|
wsHandler.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
if jwtSecret != nil && !rpc.CheckJwtSecret(w, r, jwtSecret) {
|
|
return
|
|
}
|
|
|
|
httpHandler.ServeHTTP(w, r)
|
|
})
|
|
|
|
return handler, nil
|
|
}
|
|
|
|
// isWebsocket checks the header of a http request for a websocket upgrade request.
|
|
func isWebsocket(r *http.Request) bool {
|
|
return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") &&
|
|
strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade")
|
|
}
|