Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1112.main-hosting.eu 4.18.0-553.34.1.lve.el8.x86_64 #1 SMP Thu Jan 9 16:30:32 UTC 2025 x86_64
User : u451330669 ( 451330669)
PHP Version : 8.2.27
Disable Function : NONE
Directory :  /opt/go/pkg/mod/github.com/prometheus/alertmanager@v0.26.0/cluster/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/go/pkg/mod/github.com/prometheus/alertmanager@v0.26.0/cluster/tls_transport.go
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Forked from https://github.com/mxinden/memberlist-tls-transport.

// Implements Transport interface so that all gossip communications occur via TLS over TCP.

package cluster

import (
	"context"
	"crypto/tls"
	"fmt"
	"net"
	"strings"
	"time"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/hashicorp/go-sockaddr"
	"github.com/hashicorp/memberlist"
	"github.com/pkg/errors"
	"github.com/prometheus/client_golang/prometheus"
	common "github.com/prometheus/common/config"
	"github.com/prometheus/exporter-toolkit/web"
)

const (
	metricNamespace = "alertmanager"
	metricSubsystem = "tls_transport"
	network         = "tcp"
)

// TLSTransport is a Transport implementation that uses TLS over TCP for both
// packet and stream operations.
type TLSTransport struct {
	ctx          context.Context
	cancel       context.CancelFunc
	logger       log.Logger
	bindAddr     string
	bindPort     int
	done         chan struct{}
	listener     net.Listener
	packetCh     chan *memberlist.Packet
	streamCh     chan net.Conn
	connPool     *connectionPool
	tlsServerCfg *tls.Config
	tlsClientCfg *tls.Config

	packetsSent prometheus.Counter
	packetsRcvd prometheus.Counter
	streamsSent prometheus.Counter
	streamsRcvd prometheus.Counter
	readErrs    prometheus.Counter
	writeErrs   *prometheus.CounterVec
}

// NewTLSTransport returns a TLS transport with the given configuration.
// On successful initialization, a tls listener will be created and listening.
// A valid bindAddr is required. If bindPort == 0, the system will assign
// a free port automatically.
func NewTLSTransport(
	ctx context.Context,
	logger log.Logger,
	reg prometheus.Registerer,
	bindAddr string,
	bindPort int,
	cfg *TLSTransportConfig,
) (*TLSTransport, error) {
	if cfg == nil {
		return nil, errors.New("must specify TLSTransportConfig")
	}

	tlsServerCfg, err := web.ConfigToTLSConfig(cfg.TLSServerConfig)
	if err != nil {
		return nil, errors.Wrap(err, "invalid TLS server config")
	}

	tlsClientCfg, err := common.NewTLSConfig(cfg.TLSClientConfig)
	if err != nil {
		return nil, errors.Wrap(err, "invalid TLS client config")
	}

	ip := net.ParseIP(bindAddr)
	if ip == nil {
		return nil, fmt.Errorf("invalid bind address \"%s\"", bindAddr)
	}

	addr := &net.TCPAddr{IP: ip, Port: bindPort}
	listener, err := tls.Listen(network, addr.String(), tlsServerCfg)
	if err != nil {
		return nil, errors.Wrap(err, fmt.Sprintf("failed to start TLS listener on %q port %d", bindAddr, bindPort))
	}

	connPool, err := newConnectionPool(tlsClientCfg)
	if err != nil {
		return nil, errors.Wrap(err, "failed to initialize tls transport connection pool")
	}

	ctx, cancel := context.WithCancel(ctx)
	t := &TLSTransport{
		ctx:          ctx,
		cancel:       cancel,
		logger:       logger,
		bindAddr:     bindAddr,
		bindPort:     bindPort,
		done:         make(chan struct{}),
		listener:     listener,
		packetCh:     make(chan *memberlist.Packet),
		streamCh:     make(chan net.Conn),
		connPool:     connPool,
		tlsServerCfg: tlsServerCfg,
		tlsClientCfg: tlsClientCfg,
	}

	t.registerMetrics(reg)

	go func() {
		t.listen()
		close(t.done)
	}()
	return t, nil
}

// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
func (t *TLSTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) {
	var advertiseAddr net.IP
	var advertisePort int
	if ip != "" {
		advertiseAddr = net.ParseIP(ip)
		if advertiseAddr == nil {
			return nil, 0, fmt.Errorf("failed to parse advertise address %q", ip)
		}

		if ip4 := advertiseAddr.To4(); ip4 != nil {
			advertiseAddr = ip4
		}
		advertisePort = port
	} else {
		if t.bindAddr == "0.0.0.0" {
			// Otherwise, if we're not bound to a specific IP, let's
			// use a suitable private IP address.
			var err error
			ip, err = sockaddr.GetPrivateIP()
			if err != nil {
				return nil, 0, fmt.Errorf("failed to get interface addresses: %v", err)
			}
			if ip == "" {
				return nil, 0, fmt.Errorf("no private IP address found, and explicit IP not provided")
			}

			advertiseAddr = net.ParseIP(ip)
			if advertiseAddr == nil {
				return nil, 0, fmt.Errorf("failed to parse advertise address: %q", ip)
			}
		} else {
			advertiseAddr = t.listener.Addr().(*net.TCPAddr).IP
		}
		advertisePort = t.GetAutoBindPort()
	}
	return advertiseAddr, advertisePort, nil
}

// PacketCh returns a channel that can be read to receive incoming
// packets from other peers.
func (t *TLSTransport) PacketCh() <-chan *memberlist.Packet {
	return t.packetCh
}

// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers.
func (t *TLSTransport) StreamCh() <-chan net.Conn {
	return t.streamCh
}

// Shutdown is called when memberlist is shutting down; this gives the
// TLS Transport a chance to clean up the listener and other goroutines.
func (t *TLSTransport) Shutdown() error {
	level.Debug(t.logger).Log("msg", "shutting down tls transport")
	t.cancel()
	err := t.listener.Close()
	t.connPool.shutdown()
	<-t.done
	return err
}

// WriteTo is a packet-oriented interface that borrows a connection
// from the pool, and writes to it. It also returns a timestamp of when
// the packet was written.
func (t *TLSTransport) WriteTo(b []byte, addr string) (time.Time, error) {
	conn, err := t.connPool.borrowConnection(addr, DefaultTCPTimeout)
	if err != nil {
		t.writeErrs.WithLabelValues("packet").Inc()
		return time.Now(), errors.Wrap(err, "failed to dial")
	}
	fromAddr := t.listener.Addr().String()
	err = conn.writePacket(fromAddr, b)
	if err != nil {
		t.writeErrs.WithLabelValues("packet").Inc()
		return time.Now(), errors.Wrap(err, "failed to write packet")
	}
	t.packetsSent.Add(float64(len(b)))
	return time.Now(), nil
}

// DialTimeout is used to create a connection that allows memberlist
// to perform two-way communications with a peer.
func (t *TLSTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
	conn, err := dialTLSConn(addr, timeout, t.tlsClientCfg)
	if err != nil {
		t.writeErrs.WithLabelValues("stream").Inc()
		return nil, errors.Wrap(err, "failed to dial")
	}
	err = conn.writeStream()
	netConn := conn.getRawConn()
	if err != nil {
		t.writeErrs.WithLabelValues("stream").Inc()
		return netConn, errors.Wrap(err, "failed to create stream connection")
	}
	t.streamsSent.Inc()
	return netConn, nil
}

// GetAutoBindPort returns the bind port that was automatically given by the system
// if a bindPort of 0 was specified during instantiation.
func (t *TLSTransport) GetAutoBindPort() int {
	return t.listener.Addr().(*net.TCPAddr).Port
}

// listen starts up multiple handlers accepting concurrent connections.
func (t *TLSTransport) listen() {
	for {
		select {
		case <-t.ctx.Done():

			return
		default:
			conn, err := t.listener.Accept()
			if err != nil {
				// The error "use of closed network connection" is returned when the listener is closed.
				// It is not exported in a more reasonable way. See https://github.com/golang/go/issues/4373.
				if strings.Contains(err.Error(), "use of closed network connection") {
					return
				}
				t.readErrs.Inc()
				level.Debug(t.logger).Log("msg", "error accepting connection", "err", err)

			} else {
				go t.handle(conn)
			}
		}
	}
}

func (t *TLSTransport) handle(conn net.Conn) {
	for {
		packet, err := rcvTLSConn(conn).read()
		if err != nil {
			level.Debug(t.logger).Log("msg", "error reading from connection", "err", err)
			t.readErrs.Inc()
			return
		}
		select {
		case <-t.ctx.Done():
			return
		default:
			if packet != nil {
				n := len(packet.Buf)
				t.packetCh <- packet
				t.packetsRcvd.Add(float64(n))
			} else {
				t.streamCh <- conn
				t.streamsRcvd.Inc()
				return
			}
		}
	}
}

func (t *TLSTransport) registerMetrics(reg prometheus.Registerer) {
	t.packetsSent = prometheus.NewCounter(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "packet_bytes_sent_total",
			Help:      "The number of packet bytes sent to outgoing connections (excluding internal metadata).",
		},
	)
	t.packetsRcvd = prometheus.NewCounter(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "packet_bytes_received_total",
			Help:      "The number of packet bytes received from incoming connections (excluding internal metadata).",
		},
	)
	t.streamsSent = prometheus.NewCounter(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "stream_connections_sent_total",
			Help:      "The number of stream connections sent.",
		},
	)

	t.streamsRcvd = prometheus.NewCounter(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "stream_connections_received_total",
			Help:      "The number of stream connections received.",
		},
	)
	t.readErrs = prometheus.NewCounter(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "read_errors_total",
			Help:      "The number of errors encountered while reading from incoming connections.",
		},
	)
	t.writeErrs = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: metricNamespace,
			Subsystem: metricSubsystem,
			Name:      "write_errors_total",
			Help:      "The number of errors encountered while writing to outgoing connections.",
		},
		[]string{"connection_type"},
	)

	if reg != nil {
		reg.MustRegister(t.packetsSent)
		reg.MustRegister(t.packetsRcvd)
		reg.MustRegister(t.streamsSent)
		reg.MustRegister(t.streamsRcvd)
		reg.MustRegister(t.readErrs)
		reg.MustRegister(t.writeErrs)
	}
}

Al-HUWAITI Shell