diff --git a/db/src/go.mod b/db/src/go.mod index 6a4804c..a080fe6 100644 --- a/db/src/go.mod +++ b/db/src/go.mod @@ -1,33 +1,58 @@ module gateway -go 1.24.0 +go 1.25.0 require ( github.com/InfluxCommunity/influxdb3-go/v2 v2.13.0 github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/gin-gonic/gin v1.12.0 github.com/stretchr/testify v1.11.1 + go.uber.org/mock v0.6.0 ) require ( github.com/apache/arrow-go/v18 v18.5.1 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.15.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/gabriel-vasile/mimetype v1.4.12 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.30.1 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/goccy/go-yaml v1.19.2 // indirect github.com/google/flatbuffers v25.12.19+incompatible // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/influxdata/line-protocol/v2 v2.2.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.2 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pierrec/lz4/v4 v4.1.23 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/quic-go/qpack v0.6.0 // indirect + github.com/quic-go/quic-go v0.59.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.3.1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect + golang.org/x/arch v0.22.0 // indirect + golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect golang.org/x/mod v0.32.0 // indirect - golang.org/x/net v0.49.0 // indirect + golang.org/x/net v0.51.0 // indirect golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.40.0 // indirect + golang.org/x/sys v0.41.0 // indirect golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 // indirect - golang.org/x/text v0.33.0 // indirect + golang.org/x/text v0.34.0 // indirect golang.org/x/tools v0.41.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect diff --git a/db/src/influx/influx.go b/db/src/influx/influx.go index 411d0b6..5823158 100644 --- a/db/src/influx/influx.go +++ b/db/src/influx/influx.go @@ -1,4 +1,4 @@ -// Package influx_gateway_Lib provides an abstraction to the client influx. +// Package influx provides an abstraction to the client influx. package influx import ( @@ -17,6 +17,7 @@ type Gateway interface { AddDatapoint(dp datapoint.DataPointInfo) error Close() error Flush() error + Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error) } // An InfluxGateway is the abstracted influx gateway. @@ -96,3 +97,8 @@ func (g *InfluxGateway) Flush() error { func (g *InfluxGateway) Close() error { return g.client.Close() } + +// Query executes a SQL query against InfluxDB using the Arrow Flight (gRPC) API. +func (g *InfluxGateway) Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error) { + return g.client.Query(ctx, query) +} diff --git a/db/src/main.go b/db/src/main.go index 01b6353..3988b87 100644 --- a/db/src/main.go +++ b/db/src/main.go @@ -5,8 +5,10 @@ import ( "gateway/influx" "gateway/mqtt" point "gateway/point" + "gateway/rest" "log" "os" + "strings" "time" ) @@ -57,7 +59,7 @@ func mqttConnection() *mqtt.MqttGateway { func influxConnection() *influx.InfluxGateway { influxUrl, ok := os.LookupEnv("INFLUX_URL") if !ok { - influxUrl = "https://db.e.kb28.ch:443" + influxUrl = "http://db.e.kb28.ch:8181" } influxDatabase, ok := os.LookupEnv("INFLUX_DATABASE") if !ok { @@ -69,7 +71,7 @@ func influxConnection() *influx.InfluxGateway { influxToken = "password" } - log.Printf("[Main] InfluxDB config: URL=%s, DB=%s, Org=%s, Timeout=%v\n", influxUrl, influxDatabase) + log.Printf("[Main] InfluxDB config: URL=%s, DB=%s\n", influxUrl, influxDatabase) // Create the gateway gateway, err := influx.NewInfluxGateway(influxUrl, influxToken, influxDatabase) @@ -105,6 +107,18 @@ func main() { log.Fatal(err) } - // Keep the application running to receive messages + // Initialize and start REST Gateway + restGateway := rest.NewRestGateway(influxGateway) + + port, ok := os.LookupEnv("REST_PORT") + if !ok { + port = "8080" + } + + log.Printf("[Main] Starting REST Gateway on port %s\n", port) + if err := restGateway.Run(":" + port); err != nil { + log.Fatalf("[Main] Failed to start REST Gateway: %v", err) + } + select {} } diff --git a/db/src/rest/rest.go b/db/src/rest/rest.go new file mode 100644 index 0000000..f0a6535 --- /dev/null +++ b/db/src/rest/rest.go @@ -0,0 +1,63 @@ +package rest + +import ( + "context" + "gateway/influx" + "net/http" + + "github.com/gin-gonic/gin" +) + +type RestGateway struct { + influxGateway *influx.InfluxGateway + engine *gin.Engine +} + +func NewRestGateway(influxGateway *influx.InfluxGateway) *RestGateway { + g := &RestGateway{ + influxGateway: influxGateway, + engine: gin.Default(), + } + + g.setupRoutes() + return g +} + +func (g *RestGateway) setupRoutes() { + v1 := g.engine.Group("/api/v1") + { + v1.GET("/rooms", g.getRooms) + } +} + +func (g *RestGateway) Run(addr string) error { + return g.engine.Run(addr) +} + +// GET /api/v1/rooms +func (g *RestGateway) getRooms(c *gin.Context) { + // Query unique rooms from the "provence" measurement + query := `SELECT DISTINCT("room") FROM "provence"` + + // Using context.Background() as seen in working snippet + it, err := g.influxGateway.Query(context.Background(), query) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + var rooms []string + for it.Next() { + val := it.Value() + if room, ok := val["room"].(string); ok { + rooms = append(rooms, room) + } + } + + if err := it.Err(); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, rooms) +}