summaryrefslogtreecommitdiff
path: root/hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'hub.go')
-rw-r--r--hub.go86
1 files changed, 86 insertions, 0 deletions
diff --git a/hub.go b/hub.go
new file mode 100644
index 0000000..366e4b8
--- /dev/null
+++ b/hub.go
@@ -0,0 +1,86 @@
+// hub.go
+package main
+
+import "sync"
+
+type Hub struct {
+ mu sync.RWMutex
+ clients map[*Client]bool
+ byServer map[string]map[*Client]bool
+ tokens map[string]*TokenConfig
+
+ events []Event
+ nextEventID int64
+ maxEvents int
+}
+
+func NewHub(tokens map[string]*TokenConfig) *Hub {
+ return &Hub{
+ clients: make(map[*Client]bool),
+ byServer: make(map[string]map[*Client]bool),
+ tokens: tokens,
+ events: make([]Event, 0, 1024),
+ nextEventID: 1,
+ maxEvents: 1024,
+ }
+}
+
+func (h *Hub) register(c *Client) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ h.clients[c] = true
+
+ if _, ok := h.byServer[c.serverID]; !ok {
+ h.byServer[c.serverID] = make(map[*Client]bool)
+ }
+ h.byServer[c.serverID][c] = true
+}
+
+func (h *Hub) unregister(c *Client) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ if _, ok := h.clients[c]; ok {
+ delete(h.clients, c)
+ }
+ if m, ok := h.byServer[c.serverID]; ok {
+ delete(m, c)
+ if len(m) == 0 {
+ delete(h.byServer, c.serverID)
+ }
+ }
+ close(c.send)
+}
+
+func (h *Hub) broadcastToServer(serverID string, payload []byte) {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+
+ clients := h.byServer[serverID]
+ for c := range clients {
+ select {
+ case c.send <- payload:
+ default:
+ // channel full -> drop client
+ go func(cc *Client) {
+ cc.conn.Close()
+ }(c)
+ }
+ }
+}
+
+func (h *Hub) broadcastAll(payload []byte) {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+
+ for c := range h.clients {
+ select {
+ case c.send <- payload:
+ default:
+ go func(cc *Client) {
+ cc.conn.Close()
+ }(c)
+ }
+ }
+} \ No newline at end of file