Skip to content

Commit 1f37f45

Browse files
committed
kdvh: fix join condition and remove dump functions
1) Not all tables have a typeid column 2) When present, the typeid column has different type in data and flag tables 3) Log a warning when the query does not return any rows
1 parent 106cef7 commit 1f37f45

File tree

4 files changed

+186
-261
lines changed

4 files changed

+186
-261
lines changed

migrations/kdvh/dump/dump.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dump
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"path/filepath"
@@ -80,15 +81,30 @@ func (table *Table) Dump(pool *pgxpool.Pool, config *Config) {
8081
<-semaphore
8182
}()
8283

83-
err := table.DumpFn(path, element, station, pool)
84-
if err == nil {
85-
log.Info().
86-
Str("table_name", table.TableName).
87-
Str("station", station).
88-
Str("element", element).
89-
Msg("dumped successfully")
84+
logger := log.Logger.With().
85+
Str("table", table.TableName).
86+
Str("station", station).
87+
Str("element", element).Logger()
88+
89+
query := fmt.Sprintf(table.Query, element)
90+
rows, err := pool.Query(context.TODO(), query, station)
91+
if err != nil {
92+
logger.Error().Err(err).Msg("")
93+
return
94+
}
95+
96+
filename := filepath.Join(path, element+".csv")
97+
if err := writeToCsv(filename, rows); err != nil {
98+
if errors.Is(err, EMPTY_QUERY_ERR) {
99+
logger.Warn().Msg(err.Error())
100+
return
101+
}
102+
logger.Error().Err(err).Msg("")
103+
return
90104
}
91105

106+
logger.Info().Msg("dumped successfully")
107+
92108
}()
93109
}
94110
wg.Wait()

migrations/kdvh/dump/dump_functions.go

Lines changed: 0 additions & 220 deletions
This file was deleted.

migrations/kdvh/dump/table.go

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,93 @@
11
package dump
22

3-
import "github.com/jackc/pgx/v5/pgxpool"
3+
import (
4+
"github.com/jackc/pgx/v5/pgxpool"
5+
)
46

57
type DumpFunction func(path, element, station, dataTable, flagTable string, pool *pgxpool.Pool) error
68
type Table struct {
7-
TableName string // Name of the DATA table
8-
FlagTableName string // Name of the FLAG table
9-
ElemTableName string // Name of the ELEM table
10-
dumpInner DumpFunction // How to dump a given combo of (element, station) for the given table
9+
TableName string // Name of the DATA table
10+
FlagTableName string // Name of the FLAG table
11+
ElemTableName string // Name of the ELEM table
12+
Query string // Query used to dump from the table
1113
}
1214

13-
func (table *Table) DumpFn(path, element, station string, pool *pgxpool.Pool) error {
14-
return table.dumpInner(path, element, station, table.TableName, table.FlagTableName, pool)
15-
}
15+
func NewTable(data, flag, elem string) *Table {
16+
var query string
17+
18+
// Set the query string
19+
switch data {
20+
case "T_HOMOGEN_MONTH":
21+
// T_HOMOGEN_MONTH contains seasonal and annual data, plus other derivative
22+
// data combining both of these. We decided to dump only the monthly data (season BETWEEN 1 AND 12) for
23+
// - TAM (mean hourly temperature), and
24+
// - RR (hourly precipitations, note that in Stinfosys this parameter is 'RR_1')
25+
//
26+
// We calculate the other data on the fly (outside this program) if needed.
27+
query = "SELECT dato AS time, '' AS typeid, %[1]s AS data, '' AS flag FROM " + data +
28+
" WHERE %[1]s IS NOT NULL AND stnr = $1 AND season BETWEEN 1 AND 12"
29+
30+
case "T_METARDATA":
31+
// Missing Flag table
32+
query = "SELECT dato AS time, typeid, %[1]s AS data, '' AS flag FROM " + data +
33+
" WHERE %[1]s IS NOT NULL AND stnr = $1"
34+
35+
case "T_DIURNAL", "T_MONTH":
36+
// Missing typeid column
37+
query = "SELECT dato AS time, '' AS typeid, d.%[1]s AS data, f.%[1]s AS flag FROM " +
38+
"(SELECT dato, %[1]s FROM " + data + " WHERE %[1]s IS NOT NULL AND stnr = $1) d " +
39+
"FULL OUTER JOIN " +
40+
"(SELECT dato, %[1]s FROM " + flag + " WHERE %[1]s IS NOT NULL AND stnr = $1) f " +
41+
"USING(dato)"
42+
43+
case "T_HOMOGEN_DIURNAL":
44+
// Missing Flag table and typeid column
45+
query = "SELECT dato AS time, '' AS typeid, %[1]s AS data, '' AS flag FROM " + data +
46+
" WHERE %[1]s IS NOT NULL AND stnr = $1"
47+
48+
default:
49+
query = "SELECT dato AS time, COALESCE(d.typeid, f.typeid) AS typeid, d.%[1]s AS data, f.%[1]s AS flag FROM " +
50+
"(SELECT dato, typeid, %[1]s FROM " + data + " WHERE %[1]s IS NOT NULL AND stnr = $1) d " +
51+
"FULL OUTER JOIN " +
52+
"(SELECT dato, typeid, %[1]s FROM " + flag + " WHERE %[1]s IS NOT NULL AND stnr = $1) f " +
53+
"USING(dato)"
54+
}
1655

17-
func NewTable(data, flag, elem string, fn DumpFunction) *Table {
1856
return &Table{
1957
TableName: data,
2058
FlagTableName: flag,
2159
ElemTableName: elem,
22-
dumpInner: fn,
60+
Query: query,
2361
}
2462
}
2563

2664
func InitDump() []*Table {
2765
return []*Table{
2866
// Section 1: tables that need to be migrated entirely
29-
NewTable("T_EDATA", "T_EFLAG", "T_ELEM_EDATA", dumpDataAndFlags),
30-
NewTable("T_METARDATA", "", "T_ELEM_FDATA", dumpDataOnly),
31-
32-
NewTable("T_ADATA", "T_AFLAG", "T_ELEM_OBS", dumpDataAndFlags),
33-
NewTable("T_MDATA", "T_MFLAG", "T_ELEM_OBS", dumpDataAndFlags),
34-
NewTable("T_TJ_DATA", "T_TJ_FLAG", "T_ELEM_OBS", dumpDataAndFlags),
35-
NewTable("T_PDATA", "T_PFLAG", "T_ELEM_OBS", dumpDataAndFlags),
36-
NewTable("T_NDATA", "T_NFLAG", "T_ELEM_OBS", dumpDataAndFlags),
37-
NewTable("T_VDATA", "T_VFLAG", "T_ELEM_OBS", dumpDataAndFlags),
38-
NewTable("T_UTLANDDATA", "T_UTLANDFLAG", "T_ELEM_OBS", dumpDataAndFlags),
39-
40-
NewTable("T_10MINUTE_DATA", "T_10MINUTE_FLAG", "T_ELEM_OBS", dumpDataAndFlags),
41-
NewTable("T_ADATA_LEVEL", "T_AFLAG_LEVEL", "T_ELEM_OBS", dumpDataAndFlags),
42-
NewTable("T_MINUTE_DATA", "T_MINUTE_FLAG", "T_ELEM_OBS", dumpDataAndFlags),
43-
NewTable("T_SECOND_DATA", "T_SECOND_FLAG", "T_ELEM_OBS", dumpDataAndFlags),
44-
NewTable("T_CDCV_DATA", "T_CDCV_FLAG", "T_ELEM_EDATA", dumpDataAndFlags),
45-
NewTable("T_MERMAID", "T_MERMAID_FLAG", "T_ELEM_EDATA", dumpDataAndFlags),
46-
NewTable("T_SVVDATA", "T_SVVFLAG", "T_ELEM_OBS", dumpDataAndFlags),
47-
NewTable("T_AVINOR", "T_AVINOR_FLAG", "T_ELEM_OBS", dumpDataAndFlags),
48-
49-
NewTable("T_MONTH", "T_MONTH_FLAG", "T_ELEM_MONTH", dumpDataAndFlags),
50-
NewTable("T_DIURNAL", "T_DIURNAL_FLAG", "T_ELEM_DIURNAL", dumpDataAndFlags),
51-
NewTable("T_HOMOGEN_DIURNAL", "", "T_ELEM_HOMOGEN_MONTH", dumpDataOnly),
52-
NewTable("T_HOMOGEN_MONTH", "", "T_ELEM_HOMOGEN_MONTH", dumpHomogenMonth),
67+
NewTable("T_EDATA", "T_EFLAG", "T_ELEM_EDATA"),
68+
NewTable("T_METARDATA", "", "T_ELEM_FDATA"),
69+
70+
NewTable("T_ADATA", "T_AFLAG", "T_ELEM_OBS"),
71+
NewTable("T_MDATA", "T_MFLAG", "T_ELEM_OBS"),
72+
NewTable("T_TJ_DATA", "T_TJ_FLAG", "T_ELEM_OBS"),
73+
NewTable("T_PDATA", "T_PFLAG", "T_ELEM_OBS"),
74+
NewTable("T_NDATA", "T_NFLAG", "T_ELEM_OBS"),
75+
NewTable("T_VDATA", "T_VFLAG", "T_ELEM_OBS"),
76+
NewTable("T_UTLANDDATA", "T_UTLANDFLAG", "T_ELEM_OBS"),
77+
78+
NewTable("T_10MINUTE_DATA", "T_10MINUTE_FLAG", "T_ELEM_OBS"),
79+
NewTable("T_ADATA_LEVEL", "T_AFLAG_LEVEL", "T_ELEM_OBS"),
80+
NewTable("T_MINUTE_DATA", "T_MINUTE_FLAG", "T_ELEM_OBS"),
81+
NewTable("T_SECOND_DATA", "T_SECOND_FLAG", "T_ELEM_OBS"),
82+
NewTable("T_CDCV_DATA", "T_CDCV_FLAG", "T_ELEM_EDATA"),
83+
NewTable("T_MERMAID", "T_MERMAID_FLAG", "T_ELEM_EDATA"),
84+
NewTable("T_SVVDATA", "T_SVVFLAG", "T_ELEM_OBS"),
85+
NewTable("T_AVINOR", "T_AVINOR_FLAG", "T_ELEM_OBS"),
86+
87+
NewTable("T_MONTH", "T_MONTH_FLAG", "T_ELEM_MONTH"),
88+
NewTable("T_DIURNAL", "T_DIURNAL_FLAG", "T_ELEM_DIURNAL"),
89+
NewTable("T_HOMOGEN_DIURNAL", "", "T_ELEM_HOMOGEN_MONTH"),
90+
NewTable("T_HOMOGEN_MONTH", "", "T_ELEM_HOMOGEN_MONTH"),
5391

5492
// Section 5: tables missing in the KDVH proxy:
5593
// 1. this one exists in a separate database

0 commit comments

Comments
 (0)