postgresql_controller.go 6.8 KB


  1. /*
  2. Copyright 2023.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package controller
  14. import (
  15. "context"
  16. databasev1 "github.com/iwanhae/nodb/api/v1"
  17. "github.com/iwanhae/nodb/internal/templates"
  18. "github.com/iwanhae/nodb/pkg/broadcaster"
  19. "github.com/pkg/errors"
  20. corev1 "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/intstr"
  27. ctrl "sigs.k8s.io/controller-runtime"
  28. "sigs.k8s.io/controller-runtime/pkg/client"
  29. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  30. "sigs.k8s.io/controller-runtime/pkg/log"
  31. )
  32. const (
  33. NodbFinalizer = "nodb.iwanhae.kr/finalizer"
  34. )
  35. // PostgreSQLReconciler reconciles a PostgreSQL object
  36. type PostgreSQLReconciler struct {
  37. client.Client
  38. Scheme *runtime.Scheme
  39. Broadcaster broadcaster.Broadcaster[runtime.Object]
  40. }
  41. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
  42. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls/status,verbs=get;update;patch
  43. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls/finalizers,verbs=update
  44. //+kubebuilder:rbac:groups="",resources=pods,verbs=get;create
  45. //+kubebuilder:rbac:groups="",resources=services,verbs=get;create
  46. // Reconcile is part of the main kubernetes reconciliation loop which aims to
  47. // move the current state of the cluster closer to the desired state.
  48. // TODO(user): Modify the Reconcile function to compare the state specified by
  49. // the PostgreSQL object against the actual cluster state, and then
  50. // perform operations to make the cluster state reflect the state specified by
  51. // the user.
  52. //
  53. // For more details, check Reconcile and its Result here:
  54. // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/reconcile
  55. func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
  56. logger := log.FromContext(ctx)
  57. // fetch target
  58. obj := databasev1.PostgreSQL{}
  59. if err := r.Get(ctx, req.NamespacedName, &obj); err != nil {
  60. if client.IgnoreNotFound(err) == nil {
  61. return ctrl.Result{}, nil
  62. }
  63. logger.Error(err, "resource not found")
  64. return ctrl.Result{}, err
  65. }
  66. logger.Info("reconcile", "namespace", obj.Namespace, "name", obj.Name)
  67. // notify
  68. r.Broadcaster.Publish(ctx, &obj)
  69. {
  70. // update status if changed
  71. original := obj.DeepCopy()
  72. defer func() {
  73. status := databasev1.Status_Ready
  74. if obj.Status.Conditions.Pod.IsLowerThan(status) {
  75. status = obj.Status.Conditions.Pod
  76. }
  77. if obj.Status.Conditions.Service.IsLowerThan(status) {
  78. status = obj.Status.Conditions.Service
  79. }
  80. obj.Status.Status = status
  81. err = client.IgnoreNotFound(r.Status().Patch(ctx, &obj, client.MergeFrom(original)))
  82. if err != nil {
  83. logger.Error(err, "failed to update status")
  84. }
  85. }()
  86. }
  87. {
  88. // Just for broadcasting deletion events to eveyone
  89. if controllerutil.AddFinalizer(&obj, NodbFinalizer) {
  90. obj.Status.Conditions.Pod = databasev1.Status_Pending
  91. obj.Status.Conditions.Service = databasev1.Status_Pending
  92. obj.Status.Status = databasev1.Status_Pending
  93. return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
  94. }
  95. if !obj.DeletionTimestamp.IsZero() {
  96. controllerutil.RemoveFinalizer(&obj, NodbFinalizer)
  97. return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
  98. }
  99. }
  100. // core logics
  101. if err := r.createOrUpdatePod(ctx, &obj); err != nil {
  102. return ctrl.Result{}, err
  103. }
  104. if err := r.createOrUpdateService(ctx, &obj); err != nil {
  105. return ctrl.Result{}, err
  106. }
  107. return ctrl.Result{}, nil
  108. }
  109. func (r *PostgreSQLReconciler) createOrUpdateService(ctx context.Context, obj *databasev1.PostgreSQL) error {
  110. logger := log.FromContext(ctx)
  111. svc := corev1.Service{
  112. ObjectMeta: metav1.ObjectMeta{Name: obj.Name, Namespace: obj.Namespace},
  113. }
  114. err := r.Client.Get(ctx, types.NamespacedName{
  115. Namespace: obj.Namespace, Name: obj.Name,
  116. }, &svc)
  117. if err != nil && !apierrors.IsNotFound(err) {
  118. // can't handle other than not found error
  119. return err
  120. }
  121. logger.Info("create or update service")
  122. if result, err := controllerutil.CreateOrUpdate(ctx, r.Client, &svc, func() error {
  123. if err := controllerutil.SetOwnerReference(obj, &svc, r.Scheme); err != nil {
  124. return errors.Wrap(err, "failed to set owner reference")
  125. }
  126. svc.ObjectMeta.Labels = map[string]string{
  127. templates.LabelKeyType: templates.LabelValuePostgreSQL,
  128. templates.LabelKeyName: obj.Name,
  129. }
  130. svc.Spec.Selector = map[string]string{
  131. templates.LabelKeyType: templates.LabelValuePostgreSQL,
  132. templates.LabelKeyName: obj.Name,
  133. }
  134. svc.Spec.Ports = []corev1.ServicePort{
  135. {Name: "postgres", Port: 5432, TargetPort: intstr.FromString("postgres"), Protocol: corev1.ProtocolTCP},
  136. }
  137. svc.Spec.Type = corev1.ServiceTypeNodePort
  138. svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyLocal
  139. return nil
  140. }); err != nil {
  141. return err
  142. } else if result != controllerutil.OperationResultNone {
  143. logger.Info("service modified", "result", result)
  144. }
  145. obj.Status.Conditions.Service = databasev1.Status_Ready
  146. return nil
  147. }
  148. func (r *PostgreSQLReconciler) createOrUpdatePod(ctx context.Context, obj *databasev1.PostgreSQL) error {
  149. logger := log.FromContext(ctx)
  150. pod := corev1.Pod{}
  151. if err := r.Client.Get(ctx, types.NamespacedName{
  152. Namespace: obj.Namespace,
  153. Name: obj.Name,
  154. }, &pod); err == nil {
  155. // if found return
  156. logger.Info("ignore already existing pod")
  157. return nil
  158. } else if !apierrors.IsNotFound(err) {
  159. // can't handle other than not found error
  160. return err
  161. }
  162. pod = templates.PostgreSQLPod(templates.PostgreSQLOpts{
  163. Name: obj.Name,
  164. Namespace: obj.Namespace,
  165. Tag: obj.Spec.Version,
  166. User: obj.Spec.User,
  167. Password: obj.Spec.Password,
  168. Database: obj.Spec.Database,
  169. Memory: resource.MustParse("1Gi"),
  170. Owner: obj,
  171. })
  172. logger.Info("create pod", "namespace", pod.Namespace, "name", pod.Name)
  173. if err := r.Client.Create(ctx, &pod); err != nil {
  174. return errors.Wrap(err, "failed to create pod")
  175. }
  176. obj.Status.Conditions.Pod = databasev1.Status_Initializing
  177. return nil
  178. }
  179. // SetupWithManager sets up the controller with the Manager.
  180. func (r *PostgreSQLReconciler) SetupWithManager(mgr ctrl.Manager) error {
  181. return ctrl.NewControllerManagedBy(mgr).
  182. For(&databasev1.PostgreSQL{}).
  183. Complete(r)
  184. }