Skip to content

Commit 6f6a640

Browse files
committed
Merge branch '404-data-retrieval-activity' into 'master'
feat: display state and activity of the retrieval process (#404) Closes #404 See merge request postgres-ai/database-lab!574
2 parents 89eac1e + f7507dc commit 6f6a640

File tree

15 files changed

+392
-6
lines changed

15 files changed

+392
-6
lines changed

engine/internal/retrieval/components/components.go

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config"
12+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
1213
)
1314

1415
// JobBuilder builds jobs.
@@ -27,4 +28,7 @@ type JobRunner interface {
2728

2829
// Run starts a job.
2930
Run(ctx context.Context) error
31+
32+
// ReportActivity reports the current job activity.
33+
ReportActivity(context.Context) (*activity.Activity, error)
3034
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package logical
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"strconv"
8+
9+
"github.com/docker/docker/api/types"
10+
"github.com/docker/docker/client"
11+
"github.com/jackc/pgx/v4"
12+
13+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
14+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
15+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/db"
16+
"gitlab.com/postgres-ai/database-lab/v3/pkg/config/global"
17+
"gitlab.com/postgres-ai/database-lab/v3/pkg/log"
18+
)
19+
20+
const (
21+
dleRetrieval = "dle_retrieval"
22+
23+
// queryFieldsNum defines the expected number of fields in the query result.
24+
queryFieldsNum = 5
25+
26+
customRecordSeparator = "\\\\"
27+
28+
// statQuery defines the query to get activity of the DLE retrieval.
29+
statQuery = `select
30+
coalesce(usename, ''),
31+
coalesce(extract(epoch from (clock_timestamp() - query_start)),0) as duration,
32+
left(regexp_replace(coalesce(query, ''), E'[ \\t\\r\\n]+', ' ', 'g'),100) as query_cut,
33+
coalesce(wait_event_type, ''), coalesce(wait_event, '')
34+
from pg_stat_activity
35+
where application_name = '` + dleRetrieval + "'"
36+
)
37+
38+
func dbSourceActivity(ctx context.Context, dbCfg Connection) ([]activity.PGEvent, error) {
39+
connStr := db.ConnectionString(dbCfg.Host, strconv.Itoa(dbCfg.Port), dbCfg.Username, dbCfg.DBName, dbCfg.Password)
40+
41+
querier, err := pgx.Connect(ctx, connStr)
42+
if err != nil {
43+
return nil, fmt.Errorf("failed to connect to DB: %w", err)
44+
}
45+
46+
rows, err := querier.Query(ctx, statQuery)
47+
if err != nil {
48+
return nil, fmt.Errorf("failed to perform query to get DB activity: %w", err)
49+
}
50+
51+
pgeList := make([]activity.PGEvent, 0)
52+
53+
for rows.Next() {
54+
var pge activity.PGEvent
55+
if err := rows.Scan(&pge.User, &pge.Duration, &pge.Query, &pge.WaitEventType, &pge.WaitEvent); err != nil {
56+
return nil, fmt.Errorf("failed to scan the next row to the PG Activity result set: %w", err)
57+
}
58+
59+
pgeList = append(pgeList, pge)
60+
}
61+
62+
if err := rows.Err(); err != nil {
63+
return nil, err
64+
}
65+
66+
return pgeList, nil
67+
}
68+
69+
func pgContainerActivity(ctx context.Context, docker *client.Client, containerID string, db global.Database) ([]activity.PGEvent, error) {
70+
ins, err := docker.ContainerInspect(ctx, containerID)
71+
if err != nil {
72+
return nil, fmt.Errorf("failed to check PG container activity: %w", err)
73+
}
74+
75+
if ins.State.Health.Status != types.Healthy {
76+
log.Dbg("Target container is not ready yet: ", ins.State.Health.Status)
77+
return []activity.PGEvent{}, nil
78+
}
79+
80+
activityCmd := []string{"psql", "-U", db.User(), "-d", db.Name(), "--record-separator='" + customRecordSeparator + "'", "-XAtc", statQuery}
81+
82+
log.Msg("Running activity command: ", activityCmd)
83+
84+
execCfg := types.ExecConfig{
85+
Tty: true,
86+
Cmd: activityCmd,
87+
}
88+
89+
out, err := tools.ExecCommandWithOutput(ctx, docker, containerID, execCfg)
90+
91+
if err != nil {
92+
log.Dbg("Activity command failed:", out)
93+
return nil, err
94+
}
95+
96+
return parseStatActivity(out), nil
97+
}
98+
99+
func parseStatActivity(queryResult string) []activity.PGEvent {
100+
activities := make([]activity.PGEvent, 0)
101+
102+
// Cut out line breaks from the psql output and split records by the custom separator.
103+
lines := bytes.Split(bytes.ReplaceAll([]byte(queryResult), []byte("\r\n"), []byte("")), []byte(customRecordSeparator))
104+
105+
for _, line := range lines {
106+
byteLine := bytes.TrimSpace(line)
107+
108+
if len(byteLine) == 0 {
109+
continue
110+
}
111+
112+
fields := bytes.Split(byteLine, []byte("|"))
113+
114+
if fieldsLen := len(fields); fieldsLen != queryFieldsNum {
115+
log.Dbg(fmt.Sprintf("an invalid activity line given: %d fields are available, but requires %d",
116+
fieldsLen, queryFieldsNum), fields)
117+
continue
118+
}
119+
120+
var queryDuration float64
121+
122+
if durationString := string(fields[1]); durationString != "" {
123+
parsedDuration, err := strconv.ParseFloat(durationString, 64)
124+
if err != nil {
125+
log.Dbg("Cannot parse query duration:", durationString)
126+
}
127+
128+
queryDuration = parsedDuration
129+
}
130+
131+
activities = append(activities, activity.PGEvent{
132+
User: string(fields[0]),
133+
Duration: queryDuration,
134+
Query: string(fields[2]),
135+
WaitEventType: string(fields[3]),
136+
WaitEvent: string(fields[4]),
137+
})
138+
}
139+
140+
return activities
141+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package logical
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
9+
)
10+
11+
const psqlTestResult = "|||Activity|AutoVacuumMain\\\\postgres|||Activity|LogicalLauncherMain\\\\" +
12+
"john|0.083282|SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY|Client|ClientRead\\\\" +
13+
"john|6.941369|COPY public.pgbench_accounts (aid, bid, abalance, filler) TO stdout;|Client|ClientWrite\\\\" +
14+
"postgres|1.351549|COPY public.pgbench_accounts (aid, bid, abalance, filler) FROM\r\n stdin; ||\\\\" +
15+
"postgres|10.305525|COPY public.pgbench_accounts (aid, bid, abalance, filler) FRO\r\nM stdin; |Client|ClientRead"
16+
17+
func TestParsingStatActivity(t *testing.T) {
18+
expected := []activity.PGEvent{
19+
{
20+
User: "",
21+
Query: "",
22+
Duration: 0,
23+
WaitEventType: "Activity",
24+
WaitEvent: "AutoVacuumMain",
25+
},
26+
{
27+
User: "postgres",
28+
Query: "",
29+
Duration: 0,
30+
WaitEventType: "Activity",
31+
WaitEvent: "LogicalLauncherMain",
32+
},
33+
{
34+
User: "john",
35+
Query: "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY",
36+
Duration: 0.083282,
37+
WaitEventType: "Client",
38+
WaitEvent: "ClientRead",
39+
},
40+
{
41+
User: "john",
42+
Query: "COPY public.pgbench_accounts (aid, bid, abalance, filler) TO stdout;",
43+
Duration: 6.941369,
44+
WaitEventType: "Client",
45+
WaitEvent: "ClientWrite",
46+
},
47+
48+
{
49+
User: "postgres",
50+
Query: "COPY public.pgbench_accounts (aid, bid, abalance, filler) FROM stdin; ",
51+
Duration: 1.351549,
52+
WaitEventType: "",
53+
WaitEvent: "",
54+
},
55+
{
56+
User: "postgres",
57+
Query: "COPY public.pgbench_accounts (aid, bid, abalance, filler) FROM stdin; ",
58+
Duration: 10.305525,
59+
WaitEventType: "Client",
60+
WaitEvent: "ClientRead",
61+
},
62+
}
63+
64+
result := parseStatActivity(psqlTestResult)
65+
assert.Equal(t, expected, result)
66+
}

engine/internal/retrieval/engine/postgres/logical/dump.go

+35-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
"github.com/docker/docker/api/types/container"
1818
"github.com/docker/docker/api/types/filters"
1919
"github.com/docker/docker/client"
20-
2120
"github.com/jackc/pgx/v4"
22-
2321
"github.com/pkg/errors"
2422

2523
"gitlab.com/postgres-ai/database-lab/v3/internal/diagnostic"
@@ -28,6 +26,7 @@ import (
2826
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config"
2927
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/dbmarker"
3028
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
29+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
3130
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/cont"
3231
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/db"
3332
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/defaults"
@@ -237,6 +236,33 @@ func (d *DumpJob) Reload(cfg map[string]interface{}) (err error) {
237236
return nil
238237
}
239238

239+
// ReportActivity reports the current job activity.
240+
func (d *DumpJob) ReportActivity(ctx context.Context) (*activity.Activity, error) {
241+
dbConnection := d.config.db
242+
dbConnection.Password = d.getPassword()
243+
244+
pgeList, err := dbSourceActivity(ctx, dbConnection)
245+
if err != nil {
246+
return nil, fmt.Errorf("failed to get source activity: %w", err)
247+
}
248+
249+
jobActivity := &activity.Activity{
250+
Source: pgeList,
251+
}
252+
253+
if d.DumpOptions.Restore.Enabled {
254+
pgEvents, err := pgContainerActivity(ctx, d.dockerClient, d.dumpContainerName(), d.globalCfg.Database)
255+
if err != nil {
256+
log.Err(err)
257+
return jobActivity, fmt.Errorf("failed to get activity for target container: %w", err)
258+
}
259+
260+
jobActivity.Target = pgEvents
261+
}
262+
263+
return jobActivity, nil
264+
}
265+
240266
// Run starts the job.
241267
func (d *DumpJob) Run(ctx context.Context) (err error) {
242268
log.Msg("Run job: ", d.Name())
@@ -364,7 +390,10 @@ func (d *DumpJob) Run(ctx context.Context) (err error) {
364390

365391
log.Msg("Running analyze command: ", analyzeCmd)
366392

367-
if err := tools.ExecCommand(ctx, d.dockerClient, containerID, types.ExecConfig{Cmd: analyzeCmd}); err != nil {
393+
if err := tools.ExecCommand(ctx, d.dockerClient, containerID, types.ExecConfig{
394+
Cmd: analyzeCmd,
395+
Env: []string{"PGAPPNAME=" + dleRetrieval},
396+
}); err != nil {
368397
return errors.Wrap(err, "failed to recalculate statistics after restore")
369398
}
370399

@@ -624,7 +653,9 @@ func (d *DumpJob) getExecEnvironmentVariables() []string {
624653

625654
// Set unlimited statement_timeout for the dump session
626655
// because there is a risk of dump failure due to exceeding the statement_timeout.
627-
execEnvs = append(execEnvs, "PGOPTIONS=-c statement_timeout=0")
656+
//
657+
// PGAPPNAME marks the dumping command to detect retrieval events in pg_stat_activity.
658+
execEnvs = append(execEnvs, "PGOPTIONS=-c statement_timeout=0", "PGAPPNAME="+dleRetrieval)
628659

629660
return execEnvs
630661
}

engine/internal/retrieval/engine/postgres/logical/restore.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config"
2929
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/dbmarker"
3030
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
31+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
3132
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/cont"
3233
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/defaults"
3334
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/health"
@@ -169,6 +170,20 @@ func (r *RestoreJob) Reload(cfg map[string]interface{}) (err error) {
169170
return nil
170171
}
171172

173+
// ReportActivity reports the current job activity.
174+
func (r *RestoreJob) ReportActivity(ctx context.Context) (*activity.Activity, error) {
175+
pgEvents, err := pgContainerActivity(ctx, r.dockerClient, r.restoreContainerName(), r.globalCfg.Database)
176+
if err != nil {
177+
return nil, fmt.Errorf("failed to get activity for target container: %w", err)
178+
}
179+
180+
jobActivity := &activity.Activity{
181+
Target: pgEvents,
182+
}
183+
184+
return jobActivity, nil
185+
}
186+
172187
// Run starts the job.
173188
func (r *RestoreJob) Run(ctx context.Context) (err error) {
174189
log.Msg("Run job: ", r.Name())
@@ -264,7 +279,10 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
264279

265280
log.Msg("Running analyze command: ", analyzeCmd)
266281

267-
if err := tools.ExecCommand(ctx, r.dockerClient, containerID, types.ExecConfig{Cmd: analyzeCmd}); err != nil {
282+
if err := tools.ExecCommand(ctx, r.dockerClient, containerID, types.ExecConfig{
283+
Cmd: analyzeCmd,
284+
Env: []string{"PGAPPNAME=" + dleRetrieval},
285+
}); err != nil {
268286
return errors.Wrap(err, "failed to recalculate statistics after restore")
269287
}
270288

@@ -494,7 +512,9 @@ func (r *RestoreJob) restoreDB(ctx context.Context, contID, dbName string, dbDef
494512
log.Msg("Running restore command for "+dbName, restoreCommand)
495513

496514
output, err := tools.ExecCommandWithOutput(ctx, r.dockerClient, contID, types.ExecConfig{
497-
Tty: true, Cmd: restoreCommand,
515+
Tty: true,
516+
Cmd: restoreCommand,
517+
Env: []string{"PGAPPNAME=" + dleRetrieval},
498518
})
499519

500520
if output != "" {

engine/internal/retrieval/engine/postgres/physical/physical.go

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config"
2828
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/dbmarker"
2929
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
30+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
3031
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/cont"
3132
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/health"
3233
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/pgtool"
@@ -160,6 +161,11 @@ func (r *RestoreJob) Reload(cfg map[string]interface{}) (err error) {
160161
return options.Unmarshal(cfg, &r.CopyOptions)
161162
}
162163

164+
// ReportActivity reports the current job activity.
165+
func (r *RestoreJob) ReportActivity(_ context.Context) (*activity.Activity, error) {
166+
return nil, nil
167+
}
168+
163169
// Run starts the job.
164170
func (r *RestoreJob) Run(ctx context.Context) (err error) {
165171
log.Msg("Run job: ", r.Name())

engine/internal/retrieval/engine/postgres/snapshot/logical.go

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/config"
2828
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/dbmarker"
2929
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
30+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/activity"
3031
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/cont"
3132
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/health"
3233
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/query"
@@ -113,6 +114,11 @@ func (s *LogicalInitial) Reload(cfg map[string]interface{}) (err error) {
113114
return options.Unmarshal(cfg, &s.options)
114115
}
115116

117+
// ReportActivity reports the current job activity.
118+
func (s *LogicalInitial) ReportActivity(_ context.Context) (*activity.Activity, error) {
119+
return nil, nil
120+
}
121+
116122
// Run starts the job.
117123
func (s *LogicalInitial) Run(ctx context.Context) error {
118124
if s.options.PreprocessingScript != "" {

0 commit comments

Comments
 (0)