Ver código fonte

naive api implementation

iwanhae 11 meses atrás
pai
commit
aba564e7fc

+ 14 - 2
cmd/main.go

@@ -34,7 +34,9 @@ import (
 
 	databasev1 "github.com/iwanhae/nodb/api/v1"
 	"github.com/iwanhae/nodb/internal/controller"
+	"github.com/iwanhae/nodb/internal/server"
 	"github.com/iwanhae/nodb/internal/templates"
+	"github.com/iwanhae/nodb/pkg/broadcaster"
 	//+kubebuilder:scaffold:imports
 )
 
@@ -68,6 +70,7 @@ func main() {
 	flag.Parse()
 
 	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
+	broadcaster := broadcaster.New[runtime.Object](50)
 
 	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
 		Scheme:                 scheme,
@@ -109,8 +112,9 @@ func main() {
 	}
 
 	if err = (&controller.PostgreSQLReconciler{
-		Client: mgr.GetClient(),
-		Scheme: mgr.GetScheme(),
+		Client:      mgr.GetClient(),
+		Scheme:      mgr.GetScheme(),
+		Broadcaster: broadcaster,
 	}).SetupWithManager(mgr); err != nil {
 		setupLog.Error(err, "unable to create controller", "controller", "PostgreSQL")
 		os.Exit(1)
@@ -126,6 +130,14 @@ func main() {
 		os.Exit(1)
 	}
 
+	go func() {
+		e := server.NewServer(server.Server{
+			Client:      mgr.GetClient(),
+			Broadcaster: broadcaster,
+		})
+		e.Logger.Fatal(e.Start(":8888"))
+	}()
+
 	setupLog.Info("starting manager")
 	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
 		setupLog.Error(err, "problem running manager")

+ 10 - 1
go.mod

@@ -3,6 +3,7 @@ module github.com/iwanhae/nodb
 go 1.20
 
 require (
+	github.com/labstack/echo/v4 v4.11.2
 	github.com/onsi/ginkgo/v2 v2.11.0
 	github.com/onsi/gomega v1.27.10
 	github.com/stretchr/testify v1.8.4
@@ -11,7 +12,15 @@ require (
 	sigs.k8s.io/controller-runtime v0.16.0
 )
 
-require github.com/pmezard/go-difflib v1.0.0 // indirect
+require (
+	github.com/labstack/gommon v0.4.0 // indirect
+	github.com/mattn/go-colorable v0.1.13 // indirect
+	github.com/mattn/go-isatty v0.0.19 // indirect
+	github.com/pmezard/go-difflib v1.0.0 // indirect
+	github.com/valyala/bytebufferpool v1.0.0 // indirect
+	github.com/valyala/fasttemplate v1.2.2 // indirect
+	golang.org/x/crypto v0.14.0 // indirect
+)
 
 require (
 	github.com/beorn7/perks v1.0.1 // indirect

+ 23 - 0
go.sum

@@ -83,8 +83,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/labstack/echo/v4 v4.11.2 h1:T+cTLQxWCDfqDEoydYm5kCobjmHwOwcv4OJAPHilmdE=
+github.com/labstack/echo/v4 v4.11.2/go.mod h1:UcGuQ8V6ZNRmSweBIJkPvGfwCMIlFmiqrPqiEBfPYws=
+github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8=
+github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
 github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
+github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
+github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
+github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
+github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
+github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
+github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
+github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
 github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -127,6 +138,11 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
+github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
+github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
@@ -156,6 +172,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA=
 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -187,7 +204,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
 golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -235,6 +257,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 k8s.io/api v0.28.0 h1:3j3VPWmN9tTDI68NETBWlDiA9qOiGJ7sdKeufehBYsM=

+ 43 - 14
internal/controller/postgresql_controller.go

@@ -21,6 +21,7 @@ import (
 
 	databasev1 "github.com/iwanhae/nodb/api/v1"
 	"github.com/iwanhae/nodb/internal/templates"
+	"github.com/iwanhae/nodb/pkg/broadcaster"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
 	apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -35,10 +36,15 @@ import (
 	"sigs.k8s.io/controller-runtime/pkg/log"
 )
 
+const (
+	NodbFinalizer = "nodb.iwanhae.kr/finalizer"
+)
+
 // PostgreSQLReconciler reconciles a PostgreSQL object
 type PostgreSQLReconciler struct {
 	client.Client
-	Scheme *runtime.Scheme
+	Scheme      *runtime.Scheme
+	Broadcaster broadcaster.Broadcaster[runtime.Object]
 }
 
 //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
@@ -58,6 +64,7 @@ type PostgreSQLReconciler struct {
 func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
 	logger := log.FromContext(ctx)
 
+	// fetch target
 	obj := databasev1.PostgreSQL{}
 	if err := r.Get(ctx, req.NamespacedName, &obj); err != nil {
 		if client.IgnoreNotFound(err) == nil {
@@ -66,24 +73,46 @@ func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request)
 		logger.Error(err, "resource not found")
 		return ctrl.Result{}, err
 	}
-	original := obj.DeepCopy()
 	logger.Info("reconcile", "namespace", obj.Namespace, "name", obj.Name)
 
-	defer func() {
-		status := databasev1.Status_Ready
-		if obj.Status.Conditions.Pod.IsLowerThan(status) {
-			status = obj.Status.Conditions.Pod
-		}
-		if obj.Status.Conditions.Service.IsLowerThan(status) {
-			status = obj.Status.Conditions.Service
+	// notify
+	r.Broadcaster.Publish(ctx, &obj)
+
+	{
+		// update status if changed
+		original := obj.DeepCopy()
+		defer func() {
+			status := databasev1.Status_Ready
+			if obj.Status.Conditions.Pod.IsLowerThan(status) {
+				status = obj.Status.Conditions.Pod
+			}
+			if obj.Status.Conditions.Service.IsLowerThan(status) {
+				status = obj.Status.Conditions.Service
+			}
+			obj.Status.Status = status
+			err = client.IgnoreNotFound(r.Status().Patch(ctx, &obj, client.MergeFrom(original)))
+			if err != nil {
+				logger.Error(err, "failed to update status")
+			}
+		}()
+	}
+
+	{
+		// Just for broadcasting deletion events to eveyone
+		if controllerutil.AddFinalizer(&obj, NodbFinalizer) {
+			obj.Status.Conditions.Pod = databasev1.Status_Pending
+			obj.Status.Conditions.Service = databasev1.Status_Pending
+			obj.Status.Status = databasev1.Status_Pending
+			return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
 		}
-		obj.Status.Status = status
-		err = r.Status().Patch(ctx, &obj, client.MergeFrom(original))
-		if err != nil {
-			logger.Error(err, "failed to update status")
+
+		if !obj.DeletionTimestamp.IsZero() {
+			controllerutil.RemoveFinalizer(&obj, NodbFinalizer)
+			return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
 		}
-	}()
+	}
 
+	// core logics
 	if err := r.createOrUpdatePod(ctx, &obj); err != nil {
 		return ctrl.Result{}, err
 	}

+ 25 - 0
internal/server/init.go

@@ -0,0 +1,25 @@
+package server
+
+import (
+	"net/http"
+
+	"github.com/labstack/echo/v4"
+)
+
+func NewServer(s Server) *echo.Echo {
+	e := echo.New()
+
+	e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
+		return func(c echo.Context) error {
+			if err := next(c); err != nil {
+				return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
+			}
+			return nil
+		}
+	})
+	e.GET("/api", s.watchUpdates)
+	e.POST("/api/:type", s.create)
+	e.DELETE("/api/:type/:namespace/:name", s.delete)
+
+	return e
+}

+ 89 - 0
internal/server/server.go

@@ -0,0 +1,89 @@
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"strings"
+
+	databasev1 "github.com/iwanhae/nodb/api/v1"
+	"github.com/iwanhae/nodb/pkg/broadcaster"
+	"github.com/labstack/echo/v4"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+type Server struct {
+	client.Client
+	Broadcaster broadcaster.Broadcaster[runtime.Object]
+}
+
+func (s *Server) delete(c echo.Context) error {
+	ctx := c.Request().Context()
+	dbType := c.Param("type")
+	namespace := c.Param("namespace")
+	name := c.Param("name")
+
+	switch strings.ToLower(dbType) {
+	case "postgresql":
+		return s.Client.Delete(ctx, &databasev1.PostgreSQL{
+			ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}},
+		)
+	default:
+		return echo.NewHTTPError(http.StatusBadRequest)
+	}
+}
+
+func (s *Server) create(c echo.Context) error {
+	ctx := c.Request().Context()
+	dbType := c.Param("type")
+	switch strings.ToLower(dbType) {
+	case "postgresql":
+		obj := &databasev1.PostgreSQL{}
+		if err := json.NewDecoder(c.Request().Body).Decode(obj); err != nil {
+			return err
+		}
+		return s.Client.Create(ctx, obj)
+	default:
+		return echo.NewHTTPError(http.StatusBadRequest)
+	}
+}
+
+func (s *Server) watchUpdates(c echo.Context) error {
+	ctx := c.Request().Context()
+	ch := s.Broadcaster.Subscribe()
+
+	w := c.Response().Writer
+	w.Header().Set("Content-Type", "text/event-stream")
+	w.Header().Set("Cache-Control", "no-cache")
+	w.Header().Set("Connection", "keep-alive")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	flusher := w.(http.Flusher)
+	{
+		list := databasev1.PostgreSQLList{}
+		if err := s.Client.List(ctx, &list); err != nil {
+			return err
+		}
+		go func() {
+			for _, item := range list.Items {
+				ch <- &item
+			}
+		}()
+	}
+
+	go func() {
+		<-c.Request().Context().Done()
+		close(ch)
+	}()
+
+	for obj := range ch {
+		fmt.Fprint(w, "data: ")
+		json.NewEncoder(w).Encode(obj)
+		fmt.Fprint(w, "\n\n")
+		flusher.Flush()
+	}
+
+	return nil
+}

+ 2 - 2
pkg/broadcaster/boradcaster_test.go

@@ -9,7 +9,7 @@ import (
 )
 
 func TestNewBroadcaster(t *testing.T) {
-	b := broadcaster.NewBroadcaster[int](50)
+	b := broadcaster.New[int](50)
 	if b == nil {
 		t.Error("NewBroadcaster should not return nil")
 	}
@@ -48,7 +48,7 @@ loopB:
 }
 
 func TestNewBroadcaster_auto_close(t *testing.T) {
-	b := broadcaster.NewBroadcaster[int](50)
+	b := broadcaster.New[int](50)
 	if b == nil {
 		t.Error("NewBroadcaster should not return nil")
 	}

+ 17 - 7
pkg/broadcaster/broadcaster.go

@@ -2,8 +2,10 @@ package broadcaster
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
 type Broadcaster[T any] interface {
@@ -22,7 +24,7 @@ type chanMeta[T any] struct {
 	closed atomic.Bool
 }
 
-func NewBroadcaster[T any](bufSize int) Broadcaster[T] {
+func New[T any](bufSize int) Broadcaster[T] {
 	return &broadcaster[T]{
 		mu:       sync.RWMutex{},
 		channels: make([]*chanMeta[T], 0),
@@ -39,12 +41,20 @@ func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
 		if meta.closed.Load() {
 			continue
 		}
-		select {
-		case meta.ch <- data:
-		default:
-			meta.closed.Store(true)
-			close(meta.ch)
-		}
+		func() {
+			defer func() {
+				if err := recover(); err != nil {
+					meta.closed.Store(true)
+					fmt.Println(err)
+				}
+			}()
+			select {
+			case meta.ch <- data:
+			case <-time.After(100 * time.Millisecond):
+				meta.closed.Store(true)
+				close(meta.ch)
+			}
+		}()
 	}
 
 	return nil