Efficient TCP Server Connection Management
TCP connections, they are everywhere, almost every online interaction you make, whether it's streaming your favourite video, sending an important email, or just casually browsing through different websites. They are like the foundational building blocks of the internet and so it's important for them to be consistent and reliable.
As you can see TCP servers get and handle billions of requests in a day, so it's important for them to be able to do this efficiently. Some of the common ways that TCP servers employ to do this are:
Below is a simple code example for the same:
package main
import (
"fmt"
"net"
"os"
"syscall"
)
func main() {
const maxConnections = 10 // You can adjust this
pollfds := make([]syscall.PollFd, maxConnections)
connections := make([]net.Conn, maxConnections)
activeConnections := 0 // Number of currently active connections
// Set up listener
listener, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Println("Error setting up listener:", err)
os.Exit(1)
}
defer listener.Close()
fmt.Println("Server listening on port 8080")
for {
// Add new connections to the pollfds slice
if activeConnections < maxConnections {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err)
continue
}
// Get the file descriptor from the connection
rawConn, err := conn.SyscallConn()
if err != nil {
fmt.Println("Error getting syscall connection:", err)
conn.Close()
continue
}
var fd int
err = rawConn.Control(func(f uintptr) {
fd = int(f)
})
if err != nil {
fmt.Println("Error getting file descriptor:", err)
conn.Close()
continue
}
pollfds[activeConnections] = syscall.PollFd{
Fd: int32(fd),
Events: syscall.POLLIN, // Monitor for readability
}
connections[activeConnections] = conn
activeConnections++
fmt.Println("New connection established")
}
// Wait for events using poll()
n, err := syscall.Poll(pollfds[:activeConnections], -1) // -1 for indefinite timeout
if err != nil {
fmt.Println("Error in poll:", err)
continue
}
// Handle events
for i := 0; i < activeConnections; i++ {
if pollfds[i].Revents&syscall.POLLIN != 0 {
conn := connections[i]
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading from connection:", err)
conn.Close()
pollfds[i], pollfds[activeConnections-1] = pollfds[activeConnections-1], pollfds[i]
connections[i], connections[activeConnections-1] = connections[activeConnections-1], connections[i]
activeConnections--
continue
}
fmt.Printf("Received data: %s\n", string(buf[:n]))
}
}
}
}
To run this code snippet, you need have go installed on your system, add this to a file called main.go and run it with
go run main.go
You can use the curl request below to test the server:
curl -X POST --data "Hello, Server!" http://localhost:8080
Now when you make several telnet or curl calls with keep alive on or off you will get the following message:
New connection established
When data is sent to the server, it will say
Received data: Hello, Server!
Let's delve a bit deeper into how select, poll and epoll work. All three mechanisms are fundamentally designed for asynchronous I/O. They allow a program to monitor multiple file descriptors and get notified when one or more become ready for I/O operations (reading, writing, etc.). But there are some key differences between them.
Scalability
select: This has limitations with scalability with a large number of file descriptors. Its underlying implementation often involves a linear scan through all monitored file descriptors on each invocation, which becomes a bottleneck.
poll: Improves upon select, though it may still exhibit overhead when dealing with numerous descriptors.
epoll (Linux-specific): Designed for high scalability scenarios. It introduces a concept of an "event list" maintained by the kernel, which avoids the need to rescan all file descriptors on every epoll_wait call.
Handling of File Descriptor Updates:
Level-Triggered vs. Edge-Triggered:
Use Case
Recommended by LinkedIn
package main
import (
"fmt"
"sync"
"time"
)
// Simulate a task/ job for the thread pool
type Job struct {
id int
}
// The actual worker function performing tasks
func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
defer wg.Done() // Indicate to WaitGroup that the worker has finished
for job := range jobs {
fmt.Println("Worker", id, "started job", job.id)
time.Sleep(time.Second) // Simulate work
fmt.Println("Worker", id, "finished job", job.id)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan Job, numJobs) // Buffered channel for jobs
var wg sync.WaitGroup // To track when all workers are done
// Create the thread pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, &wg)
}
// Submit jobs to the pool
for i := 0; i < numJobs; i++ {
jobs := Job{id: i}
jobs <- jobs
}
close(jobs) // No more jobs will be submitted
// Wait for all workers to finish
wg.Wait()
}
Common Problem With Thread Pools
Thread pools are obviously a great way to handle incoming traffic, but the problem arises when your thread pool is full and you still have new connections coming in. In that case, it becomes crucial how you store the rest of the connections so that they can be further processed by your thread pool when it is free. Some of the common ways to deal with this are - maintaining a queue of the new connections, rejecting the new signals until the pool is free again or dynamically scaling the thread pool. The technique that I used, was maintaining a queue of the new connections to keep a track of the new connections until the thread pool was free again, and when the thread pool got free again, I took out the connections one at a time from the thread pool and processed them in a goroutine. Using the rejection approach was obviously not correct as I was supposed to capture tests for all the incoming requests even though my thread pool was full. Dynamically scaling the thread pool is a tedious trail because our application can run on different machines with different amount of resources, so it is not very possible to have all the information for dynamically scaling the goroutines as and when required by the application.
It is important to note that although we are using queues to store the connections, we still need to define a specific size of the queue, based on the resources available, and the traffic burst expected. I used a simple FIFO(First-in, First-out) because i had to handle all the connections in the same way, but you can use a priority queue if your implementation demands the same. Here is a code sample on how to do the same:
package main
import (
"fmt"
"net"
"sync"
"time"
)
const (
maxQueueSize = 20
threadPoolSize = 5
)
var (
connectionQueue []net.Conn
queueMutex sync.Mutex
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
// ... handle error
}
defer listener.Close()
// Start worker threads
for i := 0; i < threadPoolSize; i++ {
go worker()
}
// Accept and enqueue connections
for {
conn, err := listener.Accept()
if err != nil {
// ... handle error
}
enqueueConnection(conn)
}
}
func enqueueConnection(conn net.Conn) {
queueMutex.Lock()
defer queueMutex.Unlock()
if len(connectionQueue) == maxQueueSize {
fmt.Println("Queue full, cannot accept connection at the moment")
return
}
connectionQueue = append(connectionQueue, conn)
fmt.Println("Connection enqueued")
}
func worker() {
for {
conn := dequeueConnection()
if conn != nil {
handleConnection(conn)
} else {
// If you want worker to rest if the queue is empty:
time.Sleep(100 * time.Millisecond)
}
}
}
func dequeueConnection() net.Conn {
queueMutex.Lock()
defer queueMutex.Unlock()
if len(connectionQueue) == 0 {
return nil
}
conn := connectionQueue[0]
connectionQueue = connectionQueue[1:]
return conn
}
func handleConnection(conn net.Conn) {
// Handling your connection.
conn.Close()
}
package main
import (
"fmt"
"net"
"time"
)
func handleConnection(conn net.Conn) {
defer conn.Close()
// Simulate connection activity with a loop
for {
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading from connection:", err)
return // Connection closed or error
}
fmt.Printf("Received data from %v: %s\n", conn.RemoteAddr(), buf[:n])
// Simulate some processing time
time.Sleep(500 * time.Millisecond)
}
}
func main() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Println("Error creating listener:", err)
return
}
defer listener.Close()
connections := make([]net.Conn, 0)
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err)
continue
}
connections = append(connections, conn)
go handleConnection(conn) // Handle in a separate goroutine
select {
case conn := <-connections:
// A connection has been closed, find and remove it
for i, c := range connections {
if c == conn {
connections = append(connections[:i], connections[i+1:]...)
break
}
}
default:
// No event to handle, prevent busy waiting
time.Sleep(100 * time.Millisecond)
}
}
}
So as we have seen, picking the correct approach to handle these large number of connections greatly impacts how TCP servers handle the ever-growing demands of the internet. From thread pools, to the power of goroutines, and with OS-level optimizations like epoll, developers have powerful tools to build robust and scalable servers.
So the next time you stream a video, send a message, or browse a website, remember the intricate dance of TCP connections and concurrency techniques working tirelessly behind the scenes. It is these techniques that make our seamless online experiences possible.
Conclusion
TCP connections are the backbone of the internet, powering everything from video streaming to email communications. Behind the scenes, efficient handling of these connections is critical for building robust, high-performance servers. Techniques like asynchronous I/O (select, poll, epoll), thread pools, and goroutines showcase the evolution of concurrency management, each offering unique advantages and trade-offs depending on the application's requirements.
For lightweight, high-concurrency applications, goroutines and epoll shine as they balance performance with scalability. Thread pools, though slightly heavier, offer structured control, particularly in environments with resource constraints. Meanwhile, asynchronous I/O mechanisms provide the foundation for scalable and responsive servers.
Choosing the right approach depends on the specific use case, hardware resources, and expected traffic patterns. Understanding these strategies equips developers to optimize server performance and deliver seamless user experiences.
FAQs
What is the main difference between select, poll, and epoll?
select and poll are older mechanisms that work well for a small number of file descriptors but face scalability issues due to linear scans. epoll, specific to Linux, is optimized for high-performance scenarios with many file descriptors, using event lists maintained by the kernel for efficiency.
Why are goroutines considered better than traditional threads for TCP servers?
Goroutines are lightweight, efficient, and managed by Go’s runtime scheduler. Unlike threads, they don’t require extensive memory and context-switching overhead, allowing thousands of goroutines to run concurrently in a single application.
How does a thread pool improve server performance?
A thread pool preallocates a fixed number of threads, reducing the overhead of creating and destroying threads for each connection. This ensures better resource management and limits the number of active threads to prevent overwhelming the system.
What is edge-triggered mode in epoll, and when should it be used?
In edge-triggered mode, epoll notifies an application of an event only when it occurs, not if the condition persists. This reduces redundant notifications, making it suitable for high-performance scenarios where efficient event handling is critical. However, it requires careful coding to avoid missing events.
This article was originally published on Keploy.io