...

Source file src/github.com/LINBIT/golinstor/client/sse.go

Documentation: github.com/LINBIT/golinstor/client

     1  package client
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  
     7  	"github.com/donovanhide/eventsource"
     8  )
     9  
    10  type EventMayPromoteChange struct {
    11  	ResourceName string `json:"resource_name,omitempty"`
    12  	NodeName     string `json:"node_name,omitempty"`
    13  	MayPromote   bool   `json:"may_promote,omitempty"`
    14  }
    15  
    16  // custom code
    17  
    18  // EventProvider acts as an abstraction for an EventService. It can be swapped
    19  // out for another EventService implementation, for example for testing.
    20  type EventProvider interface {
    21  	// DRBDPromotion is used to subscribe to LINSTOR DRBD Promotion events
    22  	DRBDPromotion(ctx context.Context, lastEventId string) (*DRBDMayPromoteStream, error)
    23  }
    24  
    25  const mayPromoteChange = "may-promote-change"
    26  
    27  // EventService is the service that deals with LINSTOR server side event streams.
    28  type EventService struct {
    29  	client *Client
    30  }
    31  
    32  // DRBDMayPromoteStream is a struct that contains a channel of EventMayPromoteChange events
    33  // It has a Close() method that needs to be called/defered.
    34  type DRBDMayPromoteStream struct {
    35  	Events chan EventMayPromoteChange
    36  	stream *eventsource.Stream
    37  }
    38  
    39  // Close is used to close the underlying stream and all Go routines
    40  func (dmp *DRBDMayPromoteStream) Close() {
    41  	dmp.stream.Close()
    42  }
    43  
    44  // suscribe handles stream creation, event splitting, and context cancelation
    45  func (e *EventService) subscribe(ctx context.Context, url, event, lastEventId string) (*eventsource.Stream, chan interface{}, error) {
    46  	stream, err := e.client.doEvent(ctx, url, lastEventId)
    47  	if err != nil {
    48  		return nil, nil, err
    49  	}
    50  
    51  	ch := make(chan interface{})
    52  	go func() {
    53  		defer close(ch)
    54  		for {
    55  			select {
    56  			case ev, ok := <-stream.Events:
    57  				if !ok { // most likely someone called Close()
    58  					return
    59  				}
    60  				if ev.Event() == event {
    61  					switch event {
    62  					case mayPromoteChange:
    63  						var empc EventMayPromoteChange
    64  						if err := json.Unmarshal([]byte(ev.Data()), &empc); err == nil {
    65  							ch <- empc
    66  						}
    67  					}
    68  				}
    69  			case <-ctx.Done():
    70  				return
    71  			}
    72  		}
    73  	}()
    74  
    75  	return stream, ch, nil
    76  }
    77  
    78  // DRBDPromotion is used to subscribe to LINSTOR DRBD Promotion events
    79  func (e *EventService) DRBDPromotion(ctx context.Context, lastEventId string) (*DRBDMayPromoteStream, error) {
    80  	stream, ch, err := e.subscribe(ctx, "/v1/events/drbd/promotion", mayPromoteChange, lastEventId)
    81  	if err != nil {
    82  		return nil, err
    83  	}
    84  
    85  	empch := make(chan EventMayPromoteChange)
    86  	go func() {
    87  		defer close(empch)
    88  		for ev := range ch {
    89  			if e, ok := ev.(EventMayPromoteChange); ok {
    90  				empch <- e
    91  			}
    92  		}
    93  	}()
    94  
    95  	return &DRBDMayPromoteStream{
    96  		Events: empch,
    97  		stream: stream,
    98  	}, nil
    99  }
   100  

View as plain text