Mastering gRPC server with graceful shutdown within Golang’s Hexagonal Architecture

Mastering gRPC server with graceful shutdown within Golang’s Hexagonal Architecture

·

15 min read

Introduction

In my previous blog, we implemented Hexagonal Architecture with Golang and employed the Gin library as our HTTP web server. Today, our exploration continues with gRPC — a high-performance RPC framework.

But now, I want you to imagine one fine day your service crashed or sudden shutdown and the consequences: resource leaks, incomplete transactions, and chaos across the microservices ecosystem. What is the solution to this problem?

In this blog, we will delve into implementing a gRPC server within the confines of Hexagonal Architecture using Golang. Discover the advantages of gRPC, the pitfalls of neglecting a shutdown hook, and the art of clean, graceful shutdown in the service.

Why do we prefer gRPC for microservices development?

There have been many blog posts about introducing a gRPC topic that you can easily search and explore about it. Today, we won’t go into details but just talk about the outstanding advantages it brings and that’s also the motivation we want to use it.

Microservices development demands a communication protocol that excels in efficiency, flexibility, and scalability. Here’s why gRPC is the top our choice:

  • Efficiency with HTTP/2: Utilizes HTTP/2 for simultaneous multiplexing of requests, minimizing latency.

  • Compact Serialization: Employs Protocol Buffers for compact and fast data serialization.

  • Language-Agnostic: Supports multiple programming languages, offering flexibility in implementation.

  • Bidirectional Streaming: Facilitates real-time communication with bidirectional streaming support.

  • Automatic Code Generation: Simplifies development with automatic code generation for APIs in various languages.

  • Self-Documenting APIs: Ensures clear documentation with self-documenting gRPC APIs.

  • Strong Typing and Code Generation: Reduces integration errors through strong typing and automatic code generation.

  • Interoperability and Ecosystem: Seamlessly integrates with various tools and technologies, leveraging a rich ecosystem.

You also read here to get more in-depth comparison between gRPC and RESTful protocol.

After choosing a suitable framework for the microservice model, we move on to the important part of today’s blog post, which is how to solve the problem of gracefully shutting down services.

Why is it important to shutdown services gracefully?

Source: jobc.tistory.com/219

Suddenly stopping a service can led to a bunch of problems like leaving things half-done, wasting resources, and maybe even messing up some data. A smooth shutdown makes sure a service finishes its work, gives back what it borrowed, and leaves without making a mess for the whole system.

Preventing data corruption

Suddenly terminating a service can led to incomplete transactions, potentially resulting in data corruption. A graceful shutdown ensures ongoing processes finish correctly, preserving the integrity of your data.

Avoiding resource leaks

Failing to release acquired resources during a sudden shutdown can result in resource leaks. This can impact the overall performance of your system by tying up resources that should be made available for other tasks such as manages service connections, database connections, …

Maintaining communication integrity

Microservices often rely on seamless communication. A sudden shutdown may leave connections hanging, disrupting the flow of information between services. Graceful shutdowns help avoid communication breakdowns and ensure the stability of the entire system.

Completing ongoing tasks

Services may be in the middle of important tasks when a shutdown is initiated. A graceful exit allows these tasks to reach completion, preventing any remaining incomplete actions that could lead to errors.

Ensuring predictable system behavior

Predictability is key in microservices. A graceful shutdown provides a predictable exit strategy, allowing other services to await and adapt to the changes. This helps maintain overall system stability.

Minimizing downtime impact

In scenarios where services need to be restarted or updated, a graceful shutdown minimizes downtime impact. Services can be gracefully taken offline and brought back up without causing disruptions to the entire system.

The above aspects are enough to prove that gracefully shutting down the service is very important. So, how to shut down gRPC services the right way? In the next part of this blog, we’ll get into some simple strategies and examples of code to show how to implement gRPC service with graceful shutdown without causing any issues. Let’s go!!!

Implement gRPC server in the Hexagonal architecture

We remember the structure of the Hexagonal architecture project in the previous blog. In the below file, I added folders to make clear structure of http and gRPC protocol.

├── Dockerfile
├── api
│   ├── buf.yaml
│   ├── user_service.pb.go
│   ├── user_service.proto
│   └── user_service_grpc.pb.go
├── buf.gen.yaml
├── buf.work.yaml
├── build_proto.sh
├── cmd
│   ├── grpc
│   │   └── runner.go
│   └── http
│       └── runner.go
├── conf
│   └── app.yaml
├── go.mod
├── go.sum
├── internal
│   ├── controller
│   │   ├── grpc
│   │   │   └── controller.go
│   │   └── http
│   │       └── controller.go
│   ├── core
│   │   ├── common
│   │   │   ├── router
│   │   │   │   └── router.go
│   │   │   └── utils
│   │   │       ├── converter.go
│   │   │       ├── datetime.go
│   │   │       └── logger.go
│   │   ├── config
│   │   │   └── config.go
│   │   ├── dto
│   │   │   └── user.go
│   │   ├── entity
│   │   │   └── error_code
│   │   │       └── error_code.go
│   │   ├── model
│   │   │   ├── request
│   │   │   │   └── request.go
│   │   │   └── response
│   │   │       ├── response.go
│   │   │       └── sign_up.go
│   │   ├── port
│   │   │   ├── repository
│   │   │   │   ├── db.go
│   │   │   │   └── user.go
│   │   │   └── service
│   │   │       └── user.go
│   │   ├── server
│   │   │   ├── grpc
│   │   │   │   ├── grpc_server.go
│   │   │   └── http
│   │   │       └── http_server.go
│   │   └── service
│   │       ├── user.go
│   │       └── user_test.go
│   └── infra
│       ├── config
│       │   └── config.go
│       └── repository
│           ├── db.go
│           └── user.go
├── schema
│   └── schema.sql
└── script
    └── run.sh

To demonstrate the flexibility and adapt to changes of the Hexagonal architecture, we simply need to write an additional gRPC controller (primary adapter) and gRPC server (core) to serve request from client. There is no need to modify the core service and infrastructure. You can see the image below that shows the structure that will be implemented.

First, we start implementing gRPC server.

Building a gRPC Server

  • Prepare Protobuf file

To set up the gRPC server, it is essential to create a Protobuf file for our service. In our previous blog, we introduced the implementation of the User service, specifically focusing on the SignUp API. The design of these service is outlined in the Protobuf file below:

// ./api/user_service.proto

syntax = "proto3";

package protobuf.user.service;

option go_package = "user/proto";

enum ErrorCode {
  EC_UNKNOWN = 0;
  SUCCESS = 1;

  INVALID_REQUEST = 2;
  DUPLICATE_USER = 3;
}

message SignUpRequest{
  string user_name = 1;
  string password = 2;
}

message SignUpResponse{
  bool status = 1;
  ErrorCode error_code = 2;
  string error_message = 3;
  string display_name = 4;
}

service UserService{
  rpc SignUp(SignUpRequest) returns (SignUpResponse);
}

I use Buf to generate Protobuf file to Golang code file. After we run command to generate from the above proto file to Golang code, we have code files:

  • user_service.pb.go: contains generated code of messages, enums that are defined in protobuf file such as SignUpRequest, SignUpResponse, ErrorCode.
// ./api/user_service.pb.go

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
//  protoc-gen-go v1.28.1
//  protoc        (unknown)
// source: user_service.proto

package proto

import (
 protoreflect "google.golang.org/protobuf/reflect/protoreflect"
 protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 reflect "reflect"
 sync "sync"
)

const (
 // Verify that this generated code is sufficiently up-to-date.
 _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
 // Verify that runtime/protoimpl is sufficiently up-to-date.
 _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type ErrorCode int32

const (
 ErrorCode_EC_UNKNOWN      ErrorCode = 0
 ErrorCode_SUCCESS         ErrorCode = 1
 ErrorCode_INVALID_REQUEST ErrorCode = 2
 ErrorCode_DUPLICATE_USER  ErrorCode = 3
)

..... Detail code in my github repo
  • user_service_grpc.pb.go: contains generated code of gRPC server and client such UserServiceClient, UserServiceServer.
// ./api/user_service_grpc.pb.go

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc             (unknown)
// source: user_service.proto

package proto

import (
 context "context"
 grpc "google.golang.org/grpc"
 codes "google.golang.org/grpc/codes"
 status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

// UserServiceClient is the client API for UserService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserServiceClient interface {
 SignUp(ctx context.Context, in *SignUpRequest, opts ...grpc.CallOption) (*SignUpResponse, error)
}

..... Detail code in my github repo

Regarding the gRPC client, we’ll cover that in a separate blog post. Now, let’s proceed to implement the gRPC server for the UserService.

  • Implementation guide gRPC server
// ./internal/core/server/grpc/grpc_server.go

import (
 "io"
 ...
)

type GRPCServer interface {
    Start(serviceRegister func(server *grpc.Server))
    io.Closer
}

type gRPCServer struct {
    grpcServer *grpc.Server
    config     config.GrpcServerConfig
}

The GRPCServer interface outlines the fundamental methods required for a gRPC server. Start function to initiate the server with a service registration function. Besides, the GRPCServer interface has been extended to embed the io.Closer interface, indicating that the gRPC server should now adhere to the Close method.

The gRPCServer struct encapsulates the gRPC server instance and configuration details. It acts as the implementation of the GRPCServer interface.

// ./internal/core/server/grpc/grpc_server.go

func NewGrpcServer(config config.GrpcServerConfig) (GRPCServer, error) {
    options, err := buildOptions(config)
    if err != nil {
        return nil, err
    }

    server := grpc.NewServer(options...)

    return &gRPCServer{
        config:     config,
        grpcServer: server,
    }, err
}

This function creates a new instance of the gRPC server, initializing it with the provided configuration and options obtained from the buildOptions function.

// ./internal/core/server/grpc/grpc_server.go

func buildOptions(config config.GrpcServerConfig) ([]grpc.ServerOption, error) {
    return []grpc.ServerOption{
        grpc.KeepaliveParams(buildKeepaliveParams(config.KeepaliveParams)),
        grpc.KeepaliveEnforcementPolicy(buildKeepalivePolicy(config.KeepalivePolicy)),
    }, nil
}

The buildOptions function constructs gRPC server options based on the provided configuration. ServerOption provides many other options such as keepalive, TLS, interceptor, buffer read/write, …, we will discuss about them in the next blog.

buildKeepalivePolicy and buildKeepaliveParams define the behavior of keepalive functionality in a gRPC server. Let's break down each function:

// ./internal/core/server/grpc/grpc_server.go

func buildKeepalivePolicy(config keepalive.EnforcementPolicy) keepalive.EnforcementPolicy {
    return keepalive.EnforcementPolicy{
        MinTime:             config.MinTime * time.Second,
        PermitWithoutStream: config.PermitWithoutStream,
    }
}


// in package keepalive file of grpc bibrary

// EnforcementPolicy is used to set keepalive enforcement policy on the
// server-side. Server will close connection with a client that violates this
// policy.
type EnforcementPolicy struct {
 // MinTime is the minimum amount of time a client should wait before sending
 // a keepalive ping.
 MinTime time.Duration // The current default value is 5 minutes.
 // If true, server allows keepalive pings even when there are no active
 // streams(RPCs). If false, and client sends ping when there are no active
 // streams, server will send GOAWAY and close the connection.
 PermitWithoutStream bool // false by default.
}
// ./internal/core/server/grpc/grpc_server.go

func buildKeepaliveParams(config keepalive.ServerParameters) keepalive.ServerParameters {
    return keepalive.ServerParameters{
        MaxConnectionIdle:     config.MaxConnectionIdle * time.Second,
        MaxConnectionAge:      config.MaxConnectionAge * time.Second,
        MaxConnectionAgeGrace: config.MaxConnectionAgeGrace * time.Second,
        Time:                  config.Time * time.Second,
        Timeout:               config.Timeout * time.Second,
    }
}


// in package keepalive file of grpc bibrary

// ServerParameters is used to set keepalive and max-age parameters on the
// server-side.
type ServerParameters struct {
 // MaxConnectionIdle is a duration for the amount of time after which an
 // idle connection would be closed by sending a GoAway. Idleness duration is
 // defined since the most recent time the number of outstanding RPCs became
 // zero or the connection establishment.
 MaxConnectionIdle time.Duration // The current default value is infinity.
 // MaxConnectionAge is a duration for the maximum amount of time a
 // connection may exist before it will be closed by sending a GoAway. A
 // random jitter of +/-10% will be added to MaxConnectionAge to spread out
 // connection storms.
 MaxConnectionAge time.Duration // The current default value is infinity.
 // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after
 // which the connection will be forcibly closed.
 MaxConnectionAgeGrace time.Duration // The current default value is infinity.
 // After a duration of this time if the server doesn't see any activity it
 // pings the client to see if the transport is still alive.
 // If set below 1s, a minimum value of 1s will be used instead.
 Time time.Duration // The current default value is 2 hours.
 // After having pinged for keepalive check, the server waits for a duration
 // of Timeout and if no activity is seen even after that the connection is
 // closed.
 Timeout time.Duration // The current default value is 20 seconds.
}

The Start method initializes the server by creating a TCP listener, registering services and starting to serve incoming requests.

// ./internal/core/server/grpc/grpc_server.go

func (g gRPCServer) Start(serviceRegister func(server *grpc.Server)) {
    grpcListener, err := net.Listen("tcp", ":"+strconv.Itoa(int(g.config.Port)))
    if err != nil {
        zap.L().Fatal("failed to start grpc server", zap.Any("err", err))
    }

    serviceRegister(g.grpcServer)

    zap.L().Info("start grpc server success ", zap.Any("endpoint", grpcListener.Addr()))
    if err := g.grpcServer.Serve(grpcListener); err != nil {
        zap.L().Fatal("failed to grpc server serve", zap.Any("err", err))
    }
}

The Stop method triggers a graceful shutdown of the gRPC server, ensuring ongoing requests are completed before termination. It also prevents the server from accepting new connections or requests during this process.

// ./internal/core/server/grpc/grpc_server.go

func (g gRPCServer) Close() error {
    g.grpcServer.GracefulStop()
    return nil
}

In the code above, I utilized the Uber-go Zap library for service logging. Stay tuned for our next blog, where we’ll explore the features and usage of this library, enhancing the logging capabilities of our gRPC server.

Now, let’s move on to another important topic in our blog: implementing graceful shutdown.

Building a graceful shutdown

We built the AddShutdownHook function to handle graceful shutdown for service. Here’s a breakdown of the code:

// ./internal/core/server/shutdown_hook.go

func AddShutdownHook(closers ...io.Closer) {
 zap.L().Info("listening signals...")
 c := make(chan os.Signal, 1)
 signal.Notify(
  c, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM,
 )

 <-c
 zap.L().Info("graceful shutdown...")

 for _, closer := range closers {
  if err := closer.Close(); err != nil {
   zap.L().Error("failed to stop closer", zap.Any("err", err))
  }
 }

 zap.L().Info("completed graceful shutdown")

 if err := zap.L().Sync(); err != nil {
  if !errors.Is(err, syscall.ENOTTY) {
   log.Printf("failed to flush logger err=%v\n", err)
  }
 }
}

Here's an analysis of the key components:

Signal Handling:

  • The function starts by creating a channel (c) to receive signals.

  • It registers the channel to listen for specific signals (e.g., os.Interrupt, syscall.SIGINT) using the signal.Notify function.

We listen on signals:

  • os.Interrup (or syscall.SIGINT*):* Signal for interrupting a process, often triggered by Ctrl+C in the terminal.

  • syscall.SIGHUP: Historically used when a terminal closed, now often used for reloading configurations.

  • syscall.SIGINT: Similar to os.Interrupt , sent by the OS when the user interrupts a process using Ctrl+C.

  • syscall.SIGQUIT: Signal for creating a core dump before terminating, triggered by Ctrl+\.

  • syscall.SIGTERM: Generic termination signal, allowing a process to perform cleanup before shutting down.

Graceful Shutdown Trigger:

  • The function waits for a signal to be received on the channel (<-c) before proceeding with graceful shutdown.

Closing Resources:

  • This is an important step. It then iterates through the provided io.Closer instances (closers) and calls their Close method.

Logger Synchronization:

  • Finally, it attempts to sync the logger to ensure all logs are flushed before program termination.

Absolutely, we are responsible for implementing the io.Closer interface for releasing resources like servers, databases connection, scheduled/background jobs, caches, queues, files, and external/internal client's connection, temporary data, memory, …. This ensures proper cleanup and resource release, contributing to a well-managed and efficient application lifecycle. In this example, I provide the gRPC server close and database close mechanism.

Building a grpc controller

Similarly to HTTP controllers, implementing gRPC controllers (primary adaptors) is essential for handling incoming gRPC requests. In the provided Go code, the userController serves as the primary adaptor for managing user-related gRPC operations. This controller facilitates the conversion between gRPC requests and internal representations.

// ./internal/controller/grpc/controller.go

package grpc

import (
 "context"

 proto "user-service/api"
 "user-service/internal/core/entity/error_code"
 "user-service/internal/core/model/request"
 "user-service/internal/core/model/response"
 "user-service/internal/core/port/service"
)

var errorCodeMapper = map[error_code.ErrorCode]proto.ErrorCode{
 error_code.Success:        proto.ErrorCode_SUCCESS,
 error_code.InternalError:  proto.ErrorCode_EC_UNKNOWN,
 error_code.InvalidRequest: proto.ErrorCode_INVALID_REQUEST,
 error_code.DuplicateUser:  proto.ErrorCode_DUPLICATE_USER,
}

type userController struct {
 userService service.UserService
}

func NewUserController(userService service.UserService) proto.UserServiceServer {
 return &userController{
  userService: userService,
 }
}

func (u userController) SignUp(
 ctx context.Context, request *proto.SignUpRequest,
) (*proto.SignUpResponse, error) {
 resp := u.userService.SignUp(u.newSignUpRequest(request))
 return u.newSignUpResponse(resp)
}

func (u userController) newSignUpRequest(protoRequest *proto.SignUpRequest) *request.SignUpRequest {
 return &request.SignUpRequest{
  Username: protoRequest.GetUserName(),
  Password: protoRequest.GetPassword(),
 }
}

func (u userController) newSignUpResponse(resp *response.Response) (
 *proto.SignUpResponse, error,
) {
 if !resp.Status {
  return &proto.SignUpResponse{
   Status:       resp.Status,
   ErrorCode:    u.mapErrorCode(resp.ErrorCode),
   ErrorMessage: resp.ErrorMessage,
  }, nil
 }

 data := resp.Data.(response.SignUpDataResponse)
 return &proto.SignUpResponse{
  Status:       resp.Status,
  ErrorCode:    u.mapErrorCode(resp.ErrorCode),
  ErrorMessage: resp.ErrorMessage,
  DisplayName:  data.DisplayName,
 }, nil
}

func (u userController) mapErrorCode(errCode error_code.ErrorCode) proto.ErrorCode {
 code, existed := errorCodeMapper[errCode]
 if existed {
  return code
 }

 return proto.ErrorCode_EC_UNKNOWN
}

We need to implement some components:

  • errorCodeMapper: Internal error codes map to gRPC-specific codes.

  • userController: implements proto.UserServiceServer.

  • SignUp function: Handles user registration gRPC requests.

  • newSignUpRequest and newSignUpResponse: convert between gRPC and core service formats.

Building a runner

Finally, this main function initiates the gRPC server and uses the AddShutdownHook function for graceful shutdown.

// ./cmd/grpc/runner.go

package main

import (
   "log"

   "go.uber.org/zap"
   googleGrpc "google.golang.org/grpc"
   "google.golang.org/grpc/keepalive"
   proto "user-service/api"
   grpcCtrl "user-service/internal/controller/grpc"
   "user-service/internal/core/config"
   "user-service/internal/core/server"
   "user-service/internal/core/server/grpc"
   "user-service/internal/core/service"
   infraConf "user-service/internal/infra/config"
   "user-service/internal/infra/repository"
)

func main() {
   // Initialize logger
   logger, _ := zap.NewProduction()
   undo := zap.ReplaceGlobals(logger)
   defer undo()

   // Initialize the database connection
   db, err := repository.NewDB(
      infraConf.DatabaseConfig{
         Driver: "mysql",
         Url:                     "user:password@tcp(127.0.0.1:3306)/your_database_name?charset=utf8mb4&parseTime=true&loc=UTC&tls=false&readTimeout=3s&writeTimeout=3s&timeout=3s&clientFoundRows=true",
         ConnMaxLifetimeInMinute: 3,
         MaxOpenConns:            10,
         MaxIdleConns:            1,
      },
   )
   if err != nil {
      log.Fatalf("failed to new database err=%s\n", err.Error())
   }

   // Create the UserRepository
   userRepo := repository.NewUserRepository(db)

   // Create the UserService
   userService := service.NewUserService(userRepo)

   // Create the UserController
   userController := grpcCtrl.NewUserController(userService)

   // Create the gRPC server
   grpcServer, err := grpc.NewGrpcServer(
      config.GrpcServerConfig{
         Port: 9090,
         KeepaliveParams: keepalive.ServerParameters{
            MaxConnectionIdle:     100,
            MaxConnectionAge:      7200,
            MaxConnectionAgeGrace: 60,
            Time:                  10,
            Timeout:               3,
         },
         KeepalivePolicy: keepalive.EnforcementPolicy{
            MinTime:             10,
            PermitWithoutStream: true,
         },
      },
   )
   if err != nil {
      log.Fatalf("failed to new grpc server err=%s\n", err.Error())
   }

   // Start the gRPC server
   go grpcServer.Start(
      func(server *googleGrpc.Server) {
         proto.RegisterUserServiceServer(server, userController)
      },
   )

   // Add shutdown hook to trigger closer resources of service
   server.AddShutdownHook(grpcServer, db)
}

The implementation for gRPC involves additional steps when compared to HTTP runner in the previous blog. These include initializing the Zap logger, setting up the gRPC server, registering the userController, and employing the AddShutdownHook function. In the provided example, we ensure the proper release of resources for both the gRPC server and the database.

In the previous, we mentioned the Close function of database.
Closing the database prevents new queries or connections from the service. Additionally, it ensures that all queries that have already started processing on the server are allowed to finish before the closure.

package sql

// Close closes the database and prevents new queries from starting.
// Close then waits for all queries that have started processing on the server
// to finish.
//
// It is rare to Close a DB, as the DB handle is meant to be
// long-lived and shared between many goroutines.
func (db *DB) Close() error {
....
}

Ensuring the proper release of resources for components in a service after shutdown is not only necessary but also critical for the overall system’s health and reliability. This practice contributes to the graceful handling of shutdown procedures, preventing potential issues and ensuring a smoother operational lifecycle for the entire system.

Conclusion

In this blog, we delve into the implementation of a gRPC server, providing an inclusive guide on achieving a graceful shutdown. We explore the advantages of utilizing a gRPC server and the significance of implementing a graceful shutdown mechanism. Besides, I provided the code example for gRPC service and shutdown hook mechanism, can full code source in my github. This approach not only enhances the operational efficiency of the server but also ensures a smooth and reliable shutdown process, contributing to the overall robustness of the system.

oIn the upcoming blog, we’ll dive into the discussion about interceptors in gRPC services. Stay tuned for insights into this crucial aspect of gRPC topic.