From b15663243023c68bd4dea3499299e8ba8802ebf2 Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Wed, 3 May 2023 02:47:06 -0400 Subject: [PATCH 1/7] add //nolint:gosec where math/rand is used --- libs/testfactory/txs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/testfactory/txs.go b/libs/testfactory/txs.go index 50da0937c2..d1dbdd2984 100644 --- a/libs/testfactory/txs.go +++ b/libs/testfactory/txs.go @@ -9,7 +9,7 @@ import ( func GenerateRandomlySizedTxs(count, maxSize int) types.Txs { txs := make(types.Txs, count) for i := 0; i < count; i++ { - size := mrand.Intn(maxSize) + size := mrand.Intn(maxSize) //nolint:gosec if size == 0 { size = 1 } @@ -22,7 +22,7 @@ func GenerateRandomTxs(count, size int) types.Txs { txs := make(types.Txs, count) for i := 0; i < count; i++ { tx := make([]byte, size) - _, err := mrand.Read(tx) + _, err := mrand.Read(tx) //nolint:gosec if err != nil { panic(err) } From b9c18b6e8658e8d612025211accce8b44cf46fca Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 00:56:38 -0400 Subject: [PATCH 2/7] Fix data race --- da/test/da_test.go | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/da/test/da_test.go b/da/test/da_test.go index 0f168332f0..c89909f1c7 100644 --- a/da/test/da_test.go +++ b/da/test/da_test.go @@ -3,6 +3,7 @@ package test import ( "context" "encoding/json" + "fmt" "math/rand" "net" "os" @@ -32,10 +33,17 @@ const mockDaBlockTime = 100 * time.Millisecond var testNamespaceID = types.NamespaceID{0, 1, 2, 3, 4, 5, 6, 7} -func TestLifecycle(t *testing.T) { - srv := startMockGRPCServ(t) +func TestMain(m *testing.M) { + srv := startMockGRPCServ() defer srv.GracefulStop() + httpServer := startMockCelestiaNodeServer() + defer httpServer.Stop() + + os.Exit(m.Run()) +} + +func TestLifecycle(t *testing.T) { for _, dalc := range registry.RegisteredClients() { t.Run(dalc, func(t *testing.T) { doTestLifecycle(t, registry.GetClient(dalc)) @@ -57,12 +65,6 @@ func doTestLifecycle(t *testing.T, dalc da.DataAvailabilityLayerClient) { } func TestDALC(t *testing.T) { - grpcServer := startMockGRPCServ(t) - defer grpcServer.GracefulStop() - - httpServer := startMockCelestiaNodeServer(t) - defer httpServer.Stop() - for _, dalc := range registry.RegisteredClients() { t.Run(dalc, func(t *testing.T) { doTestDALC(t, registry.GetClient(dalc)) @@ -128,12 +130,6 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) { } func TestRetrieve(t *testing.T) { - grpcServer := startMockGRPCServ(t) - defer grpcServer.GracefulStop() - - httpServer := startMockCelestiaNodeServer(t) - defer httpServer.Stop() - for _, client := range registry.RegisteredClients() { t.Run(client, func(t *testing.T) { dalc := registry.GetClient(client) @@ -145,8 +141,7 @@ func TestRetrieve(t *testing.T) { } } -func startMockGRPCServ(t *testing.T) *grpc.Server { - t.Helper() +func startMockGRPCServ() *grpc.Server { conf := grpcda.DefaultConfig logger := tmlog.NewTMLogger(os.Stdout) @@ -154,7 +149,7 @@ func startMockGRPCServ(t *testing.T) *grpc.Server { srv := mockserv.GetServer(kvStore, conf, []byte(mockDaBlockTime.String()), logger) lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port)) if err != nil { - t.Fatal(err) + panic(err) } go func() { _ = srv.Serve(lis) @@ -162,16 +157,15 @@ func startMockGRPCServ(t *testing.T) *grpc.Server { return srv } -func startMockCelestiaNodeServer(t *testing.T) *cmock.Server { - t.Helper() - httpSrv := cmock.NewServer(mockDaBlockTime, test.NewLogger(t)) +func startMockCelestiaNodeServer() *cmock.Server { + httpSrv := cmock.NewServer(mockDaBlockTime, tmlog.NewTMLogger(os.Stdout)) l, err := net.Listen("tcp4", "127.0.0.1:26658") if err != nil { - t.Fatal("failed to create listener for mock celestia-node RPC server", "error", err) + panic(fmt.Errorf("failed to create listener for mock celestia-node RPC server, error: %w", err)) } err = httpSrv.Start(l) if err != nil { - t.Fatal("can't start mock celestia-node RPC server") + panic("can't start mock celestia-node RPC server") } return httpSrv } From a2f6f2f74f84840c9a46972496d479b4e5a594d6 Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 02:37:30 -0400 Subject: [PATCH 3/7] fix data race issue too --- p2p/client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/p2p/client.go b/p2p/client.go index 789fc0932b..ce6ea6ec86 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -3,6 +3,7 @@ package p2p import ( "context" "fmt" + "os" "strings" "time" @@ -10,6 +11,8 @@ import ( "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" + tmlog "github.com/tendermint/tendermint/libs/log" + "github.com/libp2p/go-libp2p/core/crypto" cdiscovery "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" @@ -382,13 +385,14 @@ func (c *Client) setupGossiping(ctx context.Context) error { return err } - c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator)) + logger := tmlog.NewTMLogger(os.Stdout) + c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), logger, WithValidator(c.txValidator)) if err != nil { return err } go c.txGossiper.ProcessMessages(ctx) - c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), c.logger, WithValidator(c.headerValidator)) + c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), logger, WithValidator(c.headerValidator)) if err != nil { return err } From 35cd66caf89e21259f4ac8466f540b02ac04d5ee Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 09:38:02 -0400 Subject: [PATCH 4/7] replace panic with os.exit --- da/test/da_test.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/da/test/da_test.go b/da/test/da_test.go index c89909f1c7..69968ff766 100644 --- a/da/test/da_test.go +++ b/da/test/da_test.go @@ -35,12 +35,22 @@ var testNamespaceID = types.NamespaceID{0, 1, 2, 3, 4, 5, 6, 7} func TestMain(m *testing.M) { srv := startMockGRPCServ() - defer srv.GracefulStop() + if srv == nil { + os.Exit(1) + } httpServer := startMockCelestiaNodeServer() - defer httpServer.Stop() + if httpServer == nil { + os.Exit(1) + } + + exitCode := m.Run() + + // teardown servers + srv.GracefulStop() + httpServer.Stop() - os.Exit(m.Run()) + os.Exit(exitCode) } func TestLifecycle(t *testing.T) { @@ -149,7 +159,8 @@ func startMockGRPCServ() *grpc.Server { srv := mockserv.GetServer(kvStore, conf, []byte(mockDaBlockTime.String()), logger) lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port)) if err != nil { - panic(err) + fmt.Println(err) + return nil } go func() { _ = srv.Serve(lis) @@ -161,11 +172,13 @@ func startMockCelestiaNodeServer() *cmock.Server { httpSrv := cmock.NewServer(mockDaBlockTime, tmlog.NewTMLogger(os.Stdout)) l, err := net.Listen("tcp4", "127.0.0.1:26658") if err != nil { - panic(fmt.Errorf("failed to create listener for mock celestia-node RPC server, error: %w", err)) + fmt.Println("failed to create listener for mock celestia-node RPC server, error: %w", err) + return nil } err = httpSrv.Start(l) if err != nil { - panic("can't start mock celestia-node RPC server") + fmt.Println("can't start mock celestia-node RPC server") + return nil } return httpSrv } From 1496fb7f374a5828a431c5561d482c2ee2c821dc Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 10:04:01 -0400 Subject: [PATCH 5/7] add with to logger --- p2p/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/client.go b/p2p/client.go index ce6ea6ec86..979862c9df 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -386,13 +386,13 @@ func (c *Client) setupGossiping(ctx context.Context) error { } logger := tmlog.NewTMLogger(os.Stdout) - c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), logger, WithValidator(c.txValidator)) + c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), logger.With("module", "tx-gossip"), WithValidator(c.txValidator)) if err != nil { return err } go c.txGossiper.ProcessMessages(ctx) - c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), logger, WithValidator(c.headerValidator)) + c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), logger.With("module", "header-gossip"), WithValidator(c.headerValidator)) if err != nil { return err } From b382f7fd66c6236338f30aa84c06ab5ec500f48f Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 12:25:05 -0400 Subject: [PATCH 6/7] use c.logger again --- p2p/client.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/p2p/client.go b/p2p/client.go index 979862c9df..6a10ce1f7c 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - "os" "strings" "time" @@ -11,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" - tmlog "github.com/tendermint/tendermint/libs/log" "github.com/libp2p/go-libp2p/core/crypto" cdiscovery "github.com/libp2p/go-libp2p/core/discovery" @@ -385,14 +383,13 @@ func (c *Client) setupGossiping(ctx context.Context) error { return err } - logger := tmlog.NewTMLogger(os.Stdout) - c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), logger.With("module", "tx-gossip"), WithValidator(c.txValidator)) + c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator)) if err != nil { return err } go c.txGossiper.ProcessMessages(ctx) - c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), logger.With("module", "header-gossip"), WithValidator(c.headerValidator)) + c.headerGossiper, err = NewGossiper(c.host, c.ps, c.getHeaderTopic(), c.logger, WithValidator(c.headerValidator)) if err != nil { return err } From 4b6ef57ca54b13beb83cd465239c5420b74a8ed3 Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 5 May 2023 12:52:01 -0400 Subject: [PATCH 7/7] Use ctx.Done with select before trying to log error in ProcessMessages --- p2p/gossip.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/p2p/gossip.go b/p2p/gossip.go index b1f311191b..206a7fe529 100644 --- a/p2p/gossip.go +++ b/p2p/gossip.go @@ -92,9 +92,14 @@ func (g *Gossiper) Publish(ctx context.Context, data []byte) error { func (g *Gossiper) ProcessMessages(ctx context.Context) { for { _, err := g.sub.Next(ctx) - if err != nil { - g.logger.Error("failed to read message", "error", err) + select { + case <-ctx.Done(): return + default: + if err != nil { + g.logger.Error("failed to read message", "error", err) + return + } } // Logic is handled in validator }