postgresql_controller.go 6.7 KB


  1. /*
  2. Copyright 2023.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package controller
  14. import (
  15. "context"
  16. databasev1 "github.com/iwanhae/nodb/api/v1"
  17. "github.com/iwanhae/nodb/internal/templates"
  18. "github.com/iwanhae/nodb/pkg/broadcaster"
  19. "github.com/pkg/errors"
  20. corev1 "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/intstr"
  27. ctrl "sigs.k8s.io/controller-runtime"
  28. "sigs.k8s.io/controller-runtime/pkg/client"
  29. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  30. "sigs.k8s.io/controller-runtime/pkg/log"
  31. )
  32. const (
  33. NodbFinalizer = "nodb.iwanhae.kr/finalizer"
  34. )
  35. // PostgreSQLReconciler reconciles a PostgreSQL object
  36. type PostgreSQLReconciler struct {
  37. client.Client
  38. Scheme *runtime.Scheme
  39. Broadcaster broadcaster.Broadcaster[runtime.Object]
  40. }
  41. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
  42. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls/status,verbs=get;update;patch
  43. //+kubebuilder:rbac:groups=database.iwanhae.kr,resources=postgresqls/finalizers,verbs=update
  44. //+kubebuilder:rbac:groups="",resources=pods,verbs=get;create
  45. // Reconcile is part of the main kubernetes reconciliation loop which aims to
  46. // move the current state of the cluster closer to the desired state.
  47. // TODO(user): Modify the Reconcile function to compare the state specified by
  48. // the PostgreSQL object against the actual cluster state, and then
  49. // perform operations to make the cluster state reflect the state specified by
  50. // the user.
  51. //
  52. // For more details, check Reconcile and its Result here:
  53. // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/reconcile
  54. func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
  55. logger := log.FromContext(ctx)
  56. // fetch target
  57. obj := databasev1.PostgreSQL{}
  58. if err := r.Get(ctx, req.NamespacedName, &obj); err != nil {
  59. if client.IgnoreNotFound(err) == nil {
  60. return ctrl.Result{}, nil
  61. }
  62. logger.Error(err, "resource not found")
  63. return ctrl.Result{}, err
  64. }
  65. logger.Info("reconcile", "namespace", obj.Namespace, "name", obj.Name)
  66. // notify
  67. r.Broadcaster.Publish(ctx, &obj)
  68. {
  69. // update status if changed
  70. original := obj.DeepCopy()
  71. defer func() {
  72. status := databasev1.Status_Ready
  73. if obj.Status.Conditions.Pod.IsLowerThan(status) {
  74. status = obj.Status.Conditions.Pod
  75. }
  76. if obj.Status.Conditions.Service.IsLowerThan(status) {
  77. status = obj.Status.Conditions.Service
  78. }
  79. obj.Status.Status = status
  80. err = client.IgnoreNotFound(r.Status().Patch(ctx, &obj, client.MergeFrom(original)))
  81. if err != nil {
  82. logger.Error(err, "failed to update status")
  83. }
  84. }()
  85. }
  86. {
  87. // Just for broadcasting deletion events to eveyone
  88. if controllerutil.AddFinalizer(&obj, NodbFinalizer) {
  89. obj.Status.Conditions.Pod = databasev1.Status_Pending
  90. obj.Status.Conditions.Service = databasev1.Status_Pending
  91. obj.Status.Status = databasev1.Status_Pending
  92. return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
  93. }
  94. if !obj.DeletionTimestamp.IsZero() {
  95. controllerutil.RemoveFinalizer(&obj, NodbFinalizer)
  96. return ctrl.Result{Requeue: true}, r.Update(ctx, &obj)
  97. }
  98. }
  99. // core logics
  100. if err := r.createOrUpdatePod(ctx, &obj); err != nil {
  101. return ctrl.Result{}, err
  102. }
  103. if err := r.createOrUpdateService(ctx, &obj); err != nil {
  104. return ctrl.Result{}, err
  105. }
  106. return ctrl.Result{}, nil
  107. }
  108. func (r *PostgreSQLReconciler) createOrUpdateService(ctx context.Context, obj *databasev1.PostgreSQL) error {
  109. logger := log.FromContext(ctx)
  110. svc := corev1.Service{
  111. ObjectMeta: metav1.ObjectMeta{Name: obj.Name, Namespace: obj.Namespace},
  112. }
  113. err := r.Client.Get(ctx, types.NamespacedName{
  114. Namespace: obj.Namespace, Name: obj.Name,
  115. }, &svc)
  116. if err != nil && !apierrors.IsNotFound(err) {
  117. // can't handle other than not found error
  118. return err
  119. }
  120. logger.Info("create or update service")
  121. if result, err := controllerutil.CreateOrUpdate(ctx, r.Client, &svc, func() error {
  122. if err := controllerutil.SetOwnerReference(obj, &svc, r.Scheme); err != nil {
  123. return errors.Wrap(err, "failed to set owner reference")
  124. }
  125. svc.ObjectMeta.Labels = map[string]string{
  126. templates.LabelKeyType: templates.LabelValuePostgreSQL,
  127. templates.LabelKeyName: obj.Name,
  128. }
  129. svc.Spec.Selector = map[string]string{
  130. templates.LabelKeyType: templates.LabelValuePostgreSQL,
  131. templates.LabelKeyName: obj.Name,
  132. }
  133. svc.Spec.Ports = []corev1.ServicePort{
  134. {Name: "postgres", Port: 5432, TargetPort: intstr.FromString("postgres"), Protocol: corev1.ProtocolTCP},
  135. }
  136. svc.Spec.Type = corev1.ServiceTypeNodePort
  137. svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyLocal
  138. return nil
  139. }); err != nil {
  140. return err
  141. } else if result != controllerutil.OperationResultNone {
  142. logger.Info("service modified", "result", result)
  143. }
  144. obj.Status.Conditions.Service = databasev1.Status_Ready
  145. return nil
  146. }
  147. func (r *PostgreSQLReconciler) createOrUpdatePod(ctx context.Context, obj *databasev1.PostgreSQL) error {
  148. logger := log.FromContext(ctx)
  149. pod := corev1.Pod{}
  150. if err := r.Client.Get(ctx, types.NamespacedName{
  151. Namespace: obj.Namespace,
  152. Name: obj.Name,
  153. }, &pod); err == nil {
  154. // if found return
  155. logger.Info("ignore already existing pod")
  156. return nil
  157. } else if !apierrors.IsNotFound(err) {
  158. // can't handle other than not found error
  159. return err
  160. }
  161. pod = templates.PostgreSQLPod(templates.PostgreSQLOpts{
  162. Name: obj.Name,
  163. Namespace: obj.Namespace,
  164. Tag: obj.Spec.Version,
  165. User: obj.Spec.User,
  166. Password: obj.Spec.Password,
  167. Database: obj.Spec.Database,
  168. Memory: resource.MustParse("1Gi"),
  169. Owner: obj,
  170. })
  171. logger.Info("create pod", "namespace", pod.Namespace, "name", pod.Name)
  172. if err := r.Client.Create(ctx, &pod); err != nil {
  173. return errors.Wrap(err, "failed to create pod")
  174. }
  175. obj.Status.Conditions.Pod = databasev1.Status_Initializing
  176. return nil
  177. }
  178. // SetupWithManager sets up the controller with the Manager.
  179. func (r *PostgreSQLReconciler) SetupWithManager(mgr ctrl.Manager) error {
  180. return ctrl.NewControllerManagedBy(mgr).
  181. For(&databasev1.PostgreSQL{}).
  182. Complete(r)
  183. }