6
6
"errors"
7
7
"fmt"
8
8
"runtime"
9
+ "sync"
9
10
"time"
10
11
11
12
"github.com/pbnjay/memory"
@@ -17,16 +18,56 @@ import (
17
18
"gitlab.com/postgres-ai/database-lab/v3/pkg/models"
18
19
)
19
20
21
+ const errorsSoftLimit = 2
22
+
20
23
// Billing manages the billing data.
21
24
type Billing struct {
22
- platform * platform.Client
23
- props * global.EngineProps
24
- pm * pool.Manager
25
+ platform * platform.Client
26
+ props * global.EngineProps
27
+ pm * pool.Manager
28
+ mu * sync.Mutex
29
+ softFails int
25
30
}
26
31
27
32
// New creates a new Billing struct.
28
33
func New (platform * platform.Client , props * global.EngineProps , pm * pool.Manager ) * Billing {
29
- return & Billing {platform : platform , props : props , pm : pm }
34
+ return & Billing {platform : platform , props : props , pm : pm , mu : & sync.Mutex {}}
35
+ }
36
+
37
+ // Reload updates platform client.
38
+ func (b * Billing ) Reload (platformSvc * platform.Client ) {
39
+ b .platform = platformSvc
40
+ }
41
+
42
+ func (b * Billing ) increaseFailureCounter () int {
43
+ b .mu .Lock ()
44
+ defer b .mu .Unlock ()
45
+
46
+ b .softFails ++
47
+
48
+ return b .softFails
49
+ }
50
+
51
+ func (b * Billing ) softLimitCounter () int {
52
+ b .mu .Lock ()
53
+ defer b .mu .Unlock ()
54
+
55
+ return b .softFails
56
+ }
57
+
58
+ func (b * Billing ) isSoftLimitExceeded () bool {
59
+ b .mu .Lock ()
60
+ defer b .mu .Unlock ()
61
+
62
+ return b .softFails > errorsSoftLimit
63
+ }
64
+
65
+ func (b * Billing ) resetSoftFailureCounter () {
66
+ b .mu .Lock ()
67
+
68
+ b .softFails = 0
69
+
70
+ b .mu .Unlock ()
30
71
}
31
72
32
73
// RegisterInstance registers instance on the Platform.
@@ -44,14 +85,9 @@ func (b *Billing) RegisterInstance(ctx context.Context, systemMetrics models.Sys
44
85
return fmt .Errorf ("cannot register instance: %w" , err )
45
86
}
46
87
47
- if _ , err := b .platform .SendUsage (ctx , b .props , platform.InstanceUsage {
48
- InstanceID : b .props .InstanceID ,
49
- EventData : platform.DataUsage {
50
- CPU : systemMetrics .CPU ,
51
- TotalMemory : systemMetrics .TotalMemory ,
52
- DataSize : systemMetrics .DataUsed ,
53
- }}); err != nil {
54
- return fmt .Errorf ("cannot send the initial usage event: %w" , err )
88
+ // To check billing state immediately.
89
+ if err := b .SendUsage (ctx , systemMetrics ); err != nil {
90
+ return err
55
91
}
56
92
57
93
return nil
@@ -74,20 +110,51 @@ func (b *Billing) CollectUsage(ctx context.Context, system models.System) {
74
110
case <- ticker .C :
75
111
poolStat := b .pm .CollectPoolStat ()
76
112
77
- if _ , err := b .platform .SendUsage (ctx , b .props , platform.InstanceUsage {
78
- InstanceID : b .props .InstanceID ,
79
- EventData : platform.DataUsage {
80
- CPU : system .CPU ,
81
- TotalMemory : system .TotalMemory ,
82
- DataSize : poolStat .TotalUsed ,
83
- },
113
+ if err := b .SendUsage (ctx , models.System {
114
+ CPU : system .CPU ,
115
+ TotalMemory : system .TotalMemory ,
116
+ DataUsed : poolStat .TotalUsed ,
84
117
}); err != nil {
85
- log .Err ("failed to send usage event :" , err )
118
+ log .Err ("collecting usage:" , err )
86
119
}
87
120
}
88
121
}
89
122
}
90
123
124
+ // SendUsage sends usage events.
125
+ func (b * Billing ) SendUsage (ctx context.Context , systemMetrics models.System ) error {
126
+ respData , err := b .platform .SendUsage (ctx , b .props , platform.InstanceUsage {
127
+ InstanceID : b .props .InstanceID ,
128
+ EventData : platform.DataUsage {
129
+ CPU : systemMetrics .CPU ,
130
+ TotalMemory : systemMetrics .TotalMemory ,
131
+ DataSize : systemMetrics .DataUsed ,
132
+ }})
133
+
134
+ if err != nil {
135
+ b .increaseFailureCounter ()
136
+
137
+ if b .isSoftLimitExceeded () {
138
+ log .Msg ("Billing error threshold surpassed. Certain features have been temporarily disabled." )
139
+ b .props .UpdateBilling (false )
140
+ }
141
+
142
+ return fmt .Errorf ("cannot send usage event: %w. Attempts: %d" , err , b .softLimitCounter ())
143
+ }
144
+
145
+ if b .props .BillingActive != respData .BillingActive {
146
+ b .props .UpdateBilling (respData .BillingActive )
147
+
148
+ log .Dbg ("Instance state updated. Billing is active:" , respData .BillingActive )
149
+ }
150
+
151
+ if b .props .BillingActive {
152
+ b .resetSoftFailureCounter ()
153
+ }
154
+
155
+ return nil
156
+ }
157
+
91
158
func (b * Billing ) shouldSendPlatformRequests () error {
92
159
if b .props .Infrastructure == global .AWSInfrastructure {
93
160
return errors .New ("DLE infrastructure is AWS Marketplace" )
0 commit comments