279 lines
5.5 KiB
Go
279 lines
5.5 KiB
Go
package electrum
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
const (
|
|
// ClientVersion identifies the client version/name to the remote server
|
|
ClientVersion = "go-electrum1.1"
|
|
|
|
// ProtocolVersion identifies the support protocol version to the remote server
|
|
ProtocolVersion = "1.4"
|
|
|
|
nl = byte('\n')
|
|
)
|
|
|
|
var (
|
|
// DebugMode provides debug output on communications with the remote server if enabled.
|
|
DebugMode bool
|
|
|
|
// ErrServerConnected throws an error if remote server is already connected.
|
|
ErrServerConnected = errors.New("server is already connected")
|
|
|
|
// ErrServerShutdown throws an error if remote server has shutdown.
|
|
ErrServerShutdown = errors.New("server has shutdown")
|
|
|
|
// ErrTimeout throws an error if request has timed out
|
|
ErrTimeout = errors.New("request timeout")
|
|
|
|
// ErrNotImplemented throws an error if this RPC call has not been implemented yet.
|
|
ErrNotImplemented = errors.New("RPC call is not implemented")
|
|
|
|
// ErrDeprecated throws an error if this RPC call is deprecated.
|
|
ErrDeprecated = errors.New("RPC call has been deprecated")
|
|
)
|
|
|
|
// Transport provides interface to server transport.
|
|
type Transport interface {
|
|
SendMessage([]byte) error
|
|
Responses() <-chan []byte
|
|
Errors() <-chan error
|
|
Close() error
|
|
}
|
|
|
|
type container struct {
|
|
content []byte
|
|
err error
|
|
}
|
|
|
|
// Client stores information about the remote server.
|
|
type Client struct {
|
|
transport Transport
|
|
|
|
handlers map[uint64]chan *container
|
|
handlersLock sync.RWMutex
|
|
|
|
pushHandlers map[string][]chan *container
|
|
pushHandlersLock sync.RWMutex
|
|
|
|
Error chan error
|
|
quit chan struct{}
|
|
|
|
nextID uint64
|
|
}
|
|
|
|
// NewClientTCP initialize a new client for remote server and connects to the remote server using TCP
|
|
func NewClientTCP(ctx context.Context, addr string) (*Client, error) {
|
|
transport, err := NewTCPTransport(ctx, addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := &Client{
|
|
handlers: make(map[uint64]chan *container),
|
|
pushHandlers: make(map[string][]chan *container),
|
|
|
|
Error: make(chan error),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
c.transport = transport
|
|
go c.listen()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// NewClientSSL initialize a new client for remote server and connects to the remote server using SSL
|
|
func NewClientSSL(ctx context.Context, addr string, config *tls.Config) (*Client, error) {
|
|
transport, err := NewSSLTransport(ctx, addr, config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := &Client{
|
|
handlers: make(map[uint64]chan *container),
|
|
pushHandlers: make(map[string][]chan *container),
|
|
|
|
Error: make(chan error),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
c.transport = transport
|
|
go c.listen()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
type apiErr struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
func (e *apiErr) Error() string {
|
|
return fmt.Sprintf("errNo: %d, errMsg: %s", e.Code, e.Message)
|
|
}
|
|
|
|
type response struct {
|
|
ID uint64 `json:"id"`
|
|
Method string `json:"method"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
func (s *Client) listen() {
|
|
for {
|
|
if s.IsShutdown() {
|
|
break
|
|
}
|
|
if s.transport == nil {
|
|
break
|
|
}
|
|
select {
|
|
case <-s.quit:
|
|
return
|
|
case err := <-s.transport.Errors():
|
|
s.Error <- err
|
|
s.Shutdown()
|
|
case bytes := <-s.transport.Responses():
|
|
result := &container{
|
|
content: bytes,
|
|
}
|
|
|
|
msg := &response{}
|
|
err := json.Unmarshal(bytes, msg)
|
|
if err != nil {
|
|
if DebugMode {
|
|
log.Printf("Unmarshal received message failed: %v", err)
|
|
}
|
|
result.err = fmt.Errorf("Unmarshal received message failed: %v", err)
|
|
} else if msg.Error != "" {
|
|
result.err = errors.New(msg.Error)
|
|
}
|
|
|
|
if len(msg.Method) > 0 {
|
|
s.pushHandlersLock.RLock()
|
|
handlers := s.pushHandlers[msg.Method]
|
|
s.pushHandlersLock.RUnlock()
|
|
|
|
for _, handler := range handlers {
|
|
select {
|
|
case handler <- result:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
s.handlersLock.RLock()
|
|
c, ok := s.handlers[msg.ID]
|
|
s.handlersLock.RUnlock()
|
|
|
|
if ok {
|
|
// TODO: very rare case. fix this memory leak, when nobody will read channel (in case of error)
|
|
c <- result
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Client) listenPush(method string) <-chan *container {
|
|
c := make(chan *container, 1)
|
|
s.pushHandlersLock.Lock()
|
|
s.pushHandlers[method] = append(s.pushHandlers[method], c)
|
|
s.pushHandlersLock.Unlock()
|
|
|
|
return c
|
|
}
|
|
|
|
type request struct {
|
|
ID uint64 `json:"id"`
|
|
Method string `json:"method"`
|
|
Params []interface{} `json:"params"`
|
|
}
|
|
|
|
func (s *Client) request(ctx context.Context, method string, params []interface{}, v interface{}) error {
|
|
select {
|
|
case <-s.quit:
|
|
return ErrServerShutdown
|
|
default:
|
|
}
|
|
|
|
msg := request{
|
|
ID: atomic.AddUint64(&s.nextID, 1),
|
|
Method: method,
|
|
Params: params,
|
|
}
|
|
|
|
bytes, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bytes = append(bytes, nl)
|
|
|
|
err = s.transport.SendMessage(bytes)
|
|
if err != nil {
|
|
s.Shutdown()
|
|
return err
|
|
}
|
|
|
|
c := make(chan *container, 1)
|
|
|
|
s.handlersLock.Lock()
|
|
s.handlers[msg.ID] = c
|
|
s.handlersLock.Unlock()
|
|
|
|
defer func() {
|
|
s.handlersLock.Lock()
|
|
delete(s.handlers, msg.ID)
|
|
s.handlersLock.Unlock()
|
|
}()
|
|
|
|
var resp *container
|
|
select {
|
|
case resp = <-c:
|
|
case <-ctx.Done():
|
|
return ErrTimeout
|
|
}
|
|
|
|
if resp.err != nil {
|
|
return resp.err
|
|
}
|
|
|
|
if v != nil {
|
|
err = json.Unmarshal(resp.content, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Client) Shutdown() {
|
|
if !s.IsShutdown() {
|
|
close(s.quit)
|
|
}
|
|
if s.transport != nil {
|
|
_ = s.transport.Close()
|
|
}
|
|
s.transport = nil
|
|
s.handlers = nil
|
|
s.pushHandlers = nil
|
|
}
|
|
|
|
func (s *Client) IsShutdown() bool {
|
|
select {
|
|
case <-s.quit:
|
|
return true
|
|
default:
|
|
}
|
|
return false
|
|
}
|