Files
go-electrum/electrum/network.go
2025-08-14 19:49:29 -06:00

279 lines
5.5 KiB
Go

package electrum
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
)
const (
// ClientVersion identifies the client version/name to the remote server
ClientVersion = "go-electrum1.1"
// ProtocolVersion identifies the support protocol version to the remote server
ProtocolVersion = "1.4"
nl = byte('\n')
)
var (
// DebugMode provides debug output on communications with the remote server if enabled.
DebugMode bool
// ErrServerConnected throws an error if remote server is already connected.
ErrServerConnected = errors.New("server is already connected")
// ErrServerShutdown throws an error if remote server has shutdown.
ErrServerShutdown = errors.New("server has shutdown")
// ErrTimeout throws an error if request has timed out
ErrTimeout = errors.New("request timeout")
// ErrNotImplemented throws an error if this RPC call has not been implemented yet.
ErrNotImplemented = errors.New("RPC call is not implemented")
// ErrDeprecated throws an error if this RPC call is deprecated.
ErrDeprecated = errors.New("RPC call has been deprecated")
)
// Transport provides interface to server transport.
type Transport interface {
SendMessage([]byte) error
Responses() <-chan []byte
Errors() <-chan error
Close() error
}
type container struct {
content []byte
err error
}
// Client stores information about the remote server.
type Client struct {
transport Transport
handlers map[uint64]chan *container
handlersLock sync.RWMutex
pushHandlers map[string][]chan *container
pushHandlersLock sync.RWMutex
Error chan error
quit chan struct{}
nextID uint64
}
// NewClientTCP initialize a new client for remote server and connects to the remote server using TCP
func NewClientTCP(ctx context.Context, addr string) (*Client, error) {
transport, err := NewTCPTransport(ctx, addr)
if err != nil {
return nil, err
}
c := &Client{
handlers: make(map[uint64]chan *container),
pushHandlers: make(map[string][]chan *container),
Error: make(chan error),
quit: make(chan struct{}),
}
c.transport = transport
go c.listen()
return c, nil
}
// NewClientSSL initialize a new client for remote server and connects to the remote server using SSL
func NewClientSSL(ctx context.Context, addr string, config *tls.Config) (*Client, error) {
transport, err := NewSSLTransport(ctx, addr, config)
if err != nil {
return nil, err
}
c := &Client{
handlers: make(map[uint64]chan *container),
pushHandlers: make(map[string][]chan *container),
Error: make(chan error),
quit: make(chan struct{}),
}
c.transport = transport
go c.listen()
return c, nil
}
type apiErr struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (e *apiErr) Error() string {
return fmt.Sprintf("errNo: %d, errMsg: %s", e.Code, e.Message)
}
type response struct {
ID uint64 `json:"id"`
Method string `json:"method"`
Error string `json:"error"`
}
func (s *Client) listen() {
for {
if s.IsShutdown() {
break
}
if s.transport == nil {
break
}
select {
case <-s.quit:
return
case err := <-s.transport.Errors():
s.Error <- err
s.Shutdown()
case bytes := <-s.transport.Responses():
result := &container{
content: bytes,
}
msg := &response{}
err := json.Unmarshal(bytes, msg)
if err != nil {
if DebugMode {
log.Printf("Unmarshal received message failed: %v", err)
}
result.err = fmt.Errorf("Unmarshal received message failed: %v", err)
} else if msg.Error != "" {
result.err = errors.New(msg.Error)
}
if len(msg.Method) > 0 {
s.pushHandlersLock.RLock()
handlers := s.pushHandlers[msg.Method]
s.pushHandlersLock.RUnlock()
for _, handler := range handlers {
select {
case handler <- result:
default:
}
}
}
s.handlersLock.RLock()
c, ok := s.handlers[msg.ID]
s.handlersLock.RUnlock()
if ok {
// TODO: very rare case. fix this memory leak, when nobody will read channel (in case of error)
c <- result
}
}
}
}
func (s *Client) listenPush(method string) <-chan *container {
c := make(chan *container, 1)
s.pushHandlersLock.Lock()
s.pushHandlers[method] = append(s.pushHandlers[method], c)
s.pushHandlersLock.Unlock()
return c
}
type request struct {
ID uint64 `json:"id"`
Method string `json:"method"`
Params []interface{} `json:"params"`
}
func (s *Client) request(ctx context.Context, method string, params []interface{}, v interface{}) error {
select {
case <-s.quit:
return ErrServerShutdown
default:
}
msg := request{
ID: atomic.AddUint64(&s.nextID, 1),
Method: method,
Params: params,
}
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
bytes = append(bytes, nl)
err = s.transport.SendMessage(bytes)
if err != nil {
s.Shutdown()
return err
}
c := make(chan *container, 1)
s.handlersLock.Lock()
s.handlers[msg.ID] = c
s.handlersLock.Unlock()
defer func() {
s.handlersLock.Lock()
delete(s.handlers, msg.ID)
s.handlersLock.Unlock()
}()
var resp *container
select {
case resp = <-c:
case <-ctx.Done():
return ErrTimeout
}
if resp.err != nil {
return resp.err
}
if v != nil {
err = json.Unmarshal(resp.content, v)
if err != nil {
return err
}
}
return nil
}
func (s *Client) Shutdown() {
if !s.IsShutdown() {
close(s.quit)
}
if s.transport != nil {
_ = s.transport.Close()
}
s.transport = nil
s.handlers = nil
s.pushHandlers = nil
}
func (s *Client) IsShutdown() bool {
select {
case <-s.quit:
return true
default:
}
return false
}