...

Source file src/github.com/moby/spdystream/priority.go

Documentation: github.com/moby/spdystream

     1  /*
     2     Copyright 2014-2021 Docker Inc.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  package spdystream
    18  
    19  import (
    20  	"container/heap"
    21  	"sync"
    22  
    23  	"github.com/moby/spdystream/spdy"
    24  )
    25  
    26  type prioritizedFrame struct {
    27  	frame    spdy.Frame
    28  	priority uint8
    29  	insertId uint64
    30  }
    31  
    32  type frameQueue []*prioritizedFrame
    33  
    34  func (fq frameQueue) Len() int {
    35  	return len(fq)
    36  }
    37  
    38  func (fq frameQueue) Less(i, j int) bool {
    39  	if fq[i].priority == fq[j].priority {
    40  		return fq[i].insertId < fq[j].insertId
    41  	}
    42  	return fq[i].priority < fq[j].priority
    43  }
    44  
    45  func (fq frameQueue) Swap(i, j int) {
    46  	fq[i], fq[j] = fq[j], fq[i]
    47  }
    48  
    49  func (fq *frameQueue) Push(x interface{}) {
    50  	*fq = append(*fq, x.(*prioritizedFrame))
    51  }
    52  
    53  func (fq *frameQueue) Pop() interface{} {
    54  	old := *fq
    55  	n := len(old)
    56  	*fq = old[0 : n-1]
    57  	return old[n-1]
    58  }
    59  
    60  type PriorityFrameQueue struct {
    61  	queue        *frameQueue
    62  	c            *sync.Cond
    63  	size         int
    64  	nextInsertId uint64
    65  	drain        bool
    66  }
    67  
    68  func NewPriorityFrameQueue(size int) *PriorityFrameQueue {
    69  	queue := make(frameQueue, 0, size)
    70  	heap.Init(&queue)
    71  
    72  	return &PriorityFrameQueue{
    73  		queue: &queue,
    74  		size:  size,
    75  		c:     sync.NewCond(&sync.Mutex{}),
    76  	}
    77  }
    78  
    79  func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
    80  	q.c.L.Lock()
    81  	defer q.c.L.Unlock()
    82  	for q.queue.Len() >= q.size {
    83  		q.c.Wait()
    84  	}
    85  	pFrame := &prioritizedFrame{
    86  		frame:    frame,
    87  		priority: priority,
    88  		insertId: q.nextInsertId,
    89  	}
    90  	q.nextInsertId = q.nextInsertId + 1
    91  	heap.Push(q.queue, pFrame)
    92  	q.c.Signal()
    93  }
    94  
    95  func (q *PriorityFrameQueue) Pop() spdy.Frame {
    96  	q.c.L.Lock()
    97  	defer q.c.L.Unlock()
    98  	for q.queue.Len() == 0 {
    99  		if q.drain {
   100  			return nil
   101  		}
   102  		q.c.Wait()
   103  	}
   104  	frame := heap.Pop(q.queue).(*prioritizedFrame).frame
   105  	q.c.Signal()
   106  	return frame
   107  }
   108  
   109  func (q *PriorityFrameQueue) Drain() {
   110  	q.c.L.Lock()
   111  	defer q.c.L.Unlock()
   112  	q.drain = true
   113  	q.c.Broadcast()
   114  }
   115  

View as plain text