chore(tfstated): implement a transaction wrapper
This commit is contained in:
parent
25ed1188ed
commit
f649f7bbbf
5 changed files with 119 additions and 150 deletions
|
@ -50,42 +50,31 @@ func (db *DB) LoadAccountByUsername(username string) (*model.Account, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) InitAdminAccount() error {
|
func (db *DB) InitAdminAccount() error {
|
||||||
tx, err := db.Begin()
|
return db.WithTransaction(func(tx *sql.Tx) error {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
_ = tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
var hasAdminAccount bool
|
var hasAdminAccount bool
|
||||||
if err = tx.QueryRowContext(db.ctx, `SELECT EXISTS (SELECT 1 FROM accounts WHERE is_admin);`).Scan(&hasAdminAccount); err != nil {
|
if err := tx.QueryRowContext(db.ctx, `SELECT EXISTS (SELECT 1 FROM accounts WHERE is_admin);`).Scan(&hasAdminAccount); err != nil {
|
||||||
return fmt.Errorf("failed to select if there is an admin account in the database: %w", err)
|
return fmt.Errorf("failed to select if there is an admin account in the database: %w", err)
|
||||||
}
|
}
|
||||||
if hasAdminAccount {
|
if !hasAdminAccount {
|
||||||
tx.Rollback()
|
|
||||||
} else {
|
|
||||||
var password uuid.UUID
|
var password uuid.UUID
|
||||||
if err = password.Generate(uuid.V4); err != nil {
|
if err := password.Generate(uuid.V4); err != nil {
|
||||||
return fmt.Errorf("failed to generate initial admin password: %w", err)
|
return fmt.Errorf("failed to generate initial admin password: %w", err)
|
||||||
}
|
}
|
||||||
salt := helpers.GenerateSalt()
|
salt := helpers.GenerateSalt()
|
||||||
hash := helpers.HashPassword(password.String(), salt)
|
hash := helpers.HashPassword(password.String(), salt)
|
||||||
if _, err = tx.ExecContext(db.ctx,
|
if _, err := tx.ExecContext(db.ctx,
|
||||||
`INSERT INTO accounts(username, salt, password_hash, is_admin)
|
`INSERT INTO accounts(username, salt, password_hash, is_admin)
|
||||||
VALUES ("admin", :salt, :hash, TRUE)
|
VALUES ("admin", :salt, :hash, TRUE)
|
||||||
ON CONFLICT DO UPDATE SET password_hash = :hash
|
ON CONFLICT DO UPDATE SET password_hash = :hash
|
||||||
WHERE username = "admin";`,
|
WHERE username = "admin";`,
|
||||||
sql.Named("salt", salt),
|
sql.Named("salt", salt),
|
||||||
sql.Named("hash", hash),
|
sql.Named("hash", hash),
|
||||||
); err != nil {
|
); err == nil {
|
||||||
|
AdvertiseAdminPassword(password.String())
|
||||||
|
} else {
|
||||||
return fmt.Errorf("failed to set initial admin password: %w", err)
|
return fmt.Errorf("failed to set initial admin password: %w", err)
|
||||||
}
|
}
|
||||||
err = tx.Commit()
|
|
||||||
if err == nil {
|
|
||||||
AdvertiseAdminPassword(password.String())
|
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
return err
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package database
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"git.adyxax.org/adyxax/tfstated/pkg/scrypto"
|
"git.adyxax.org/adyxax/tfstated/pkg/scrypto"
|
||||||
|
@ -81,10 +82,6 @@ func NewDB(ctx context.Context, url string) (*DB, error) {
|
||||||
return &db, nil
|
return &db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Begin() (*sql.Tx, error) {
|
|
||||||
return db.writeDB.Begin()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
if err := db.readDB.Close(); err != nil {
|
if err := db.readDB.Close(); err != nil {
|
||||||
_ = db.writeDB.Close()
|
_ = db.writeDB.Close()
|
||||||
|
@ -107,3 +104,20 @@ func (db *DB) SetDataEncryptionKey(s string) error {
|
||||||
func (db *DB) SetVersionsHistoryLimit(n int) {
|
func (db *DB) SetVersionsHistoryLimit(n int) {
|
||||||
db.versionsHistoryLimit = n
|
db.versionsHistoryLimit = n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) WithTransaction(f func(tx *sql.Tx) error) error {
|
||||||
|
tx, err := db.writeDB.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = f(tx)
|
||||||
|
if err == nil {
|
||||||
|
err = tx.Commit()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err2 := tx.Rollback(); err2 != nil {
|
||||||
|
panic(fmt.Sprintf("failed to rollback transaction: %+v. Reason for rollback: %+v", err2, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -10,45 +10,32 @@ import (
|
||||||
// true if the function locked the state, otherwise returns false and the lock
|
// true if the function locked the state, otherwise returns false and the lock
|
||||||
// parameter is updated to the value of the existing lock
|
// parameter is updated to the value of the existing lock
|
||||||
func (db *DB) SetLockOrGetExistingLock(path string, lock any) (bool, error) {
|
func (db *DB) SetLockOrGetExistingLock(path string, lock any) (bool, error) {
|
||||||
tx, err := db.Begin()
|
ret := false
|
||||||
if err != nil {
|
return ret, db.WithTransaction(func(tx *sql.Tx) error {
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
_ = tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
var lockData []byte
|
var lockData []byte
|
||||||
if err = tx.QueryRowContext(db.ctx, `SELECT lock FROM states WHERE path = ?;`, path).Scan(&lockData); err != nil {
|
if err := tx.QueryRowContext(db.ctx, `SELECT lock FROM states WHERE path = ?;`, path).Scan(&lockData); err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
if lockData, err = json.Marshal(lock); err != nil {
|
if lockData, err = json.Marshal(lock); err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(db.ctx, `INSERT INTO states(path, lock) VALUES (?, json(?))`, path, lockData)
|
_, err = tx.ExecContext(db.ctx, `INSERT INTO states(path, lock) VALUES (?, json(?))`, path, lockData)
|
||||||
if err != nil {
|
ret = true
|
||||||
return false, err
|
return err
|
||||||
}
|
|
||||||
err = tx.Commit()
|
|
||||||
return true, err
|
|
||||||
} else {
|
} else {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if lockData != nil {
|
if lockData != nil {
|
||||||
_ = tx.Rollback()
|
return json.Unmarshal(lockData, lock)
|
||||||
err = json.Unmarshal(lockData, lock)
|
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
|
var err error
|
||||||
if lockData, err = json.Marshal(lock); err != nil {
|
if lockData, err = json.Marshal(lock); err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(db.ctx, `UPDATE states SET lock = json(?) WHERE path = ?;`, lockData, path)
|
_, err = tx.ExecContext(db.ctx, `UPDATE states SET lock = json(?) WHERE path = ?;`, lockData, path)
|
||||||
if err != nil {
|
ret = true
|
||||||
return false, err
|
return err
|
||||||
}
|
})
|
||||||
err = tx.Commit()
|
|
||||||
return true, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Unlock(path, lock any) (bool, error) {
|
func (db *DB) Unlock(path, lock any) (bool, error) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"embed"
|
"embed"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
|
||||||
|
@ -28,16 +29,7 @@ func (db *DB) migrate() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := db.Begin()
|
return db.WithTransaction(func(tx *sql.Tx) error {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
_ = tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var version int
|
var version int
|
||||||
if err = tx.QueryRowContext(db.ctx, `SELECT version FROM schema_version;`).Scan(&version); err != nil {
|
if err = tx.QueryRowContext(db.ctx, `SELECT version FROM schema_version;`).Scan(&version); err != nil {
|
||||||
if err.Error() == "no such table: schema_version" {
|
if err.Error() == "no such table: schema_version" {
|
||||||
|
@ -53,11 +45,7 @@ func (db *DB) migrate() error {
|
||||||
}
|
}
|
||||||
version++
|
version++
|
||||||
}
|
}
|
||||||
if _, err = tx.ExecContext(db.ctx, `DELETE FROM schema_version; INSERT INTO schema_version (version) VALUES (?);`, version); err != nil {
|
_, err = tx.ExecContext(db.ctx, `DELETE FROM schema_version; INSERT INTO schema_version (version) VALUES (?);`, version)
|
||||||
return err
|
return err
|
||||||
}
|
})
|
||||||
if err = tx.Commit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,15 +48,8 @@ func (db *DB) SetState(path string, accountID int, data []byte, lockID string) (
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
tx, err := db.Begin()
|
ret := false
|
||||||
if err != nil {
|
return ret, db.WithTransaction(func(tx *sql.Tx) error {
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
_ = tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
var (
|
var (
|
||||||
stateID int64
|
stateID int64
|
||||||
lockData []byte
|
lockData []byte
|
||||||
|
@ -66,20 +59,21 @@ func (db *DB) SetState(path string, accountID int, data []byte, lockID string) (
|
||||||
var result sql.Result
|
var result sql.Result
|
||||||
result, err = tx.ExecContext(db.ctx, `INSERT INTO states(path) VALUES (?)`, path)
|
result, err = tx.ExecContext(db.ctx, `INSERT INTO states(path) VALUES (?)`, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
stateID, err = result.LastInsertId()
|
stateID, err = result.LastInsertId()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if lockID != "" && slices.Compare([]byte(lockID), lockData) != 0 {
|
if lockID != "" && slices.Compare([]byte(lockID), lockData) != 0 {
|
||||||
err = fmt.Errorf("failed to update state, lock ID does not match")
|
err = fmt.Errorf("failed to update state, lock ID does not match")
|
||||||
return true, err
|
ret = true
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(db.ctx,
|
_, err = tx.ExecContext(db.ctx,
|
||||||
`INSERT INTO versions(account_id, state_id, data, lock)
|
`INSERT INTO versions(account_id, state_id, data, lock)
|
||||||
|
@ -90,7 +84,7 @@ func (db *DB) SetState(path string, accountID int, data []byte, lockID string) (
|
||||||
sql.Named("stateID", stateID),
|
sql.Named("stateID", stateID),
|
||||||
sql.Named("data", encryptedData))
|
sql.Named("data", encryptedData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(db.ctx,
|
_, err = tx.ExecContext(db.ctx,
|
||||||
`DELETE FROM versions
|
`DELETE FROM versions
|
||||||
|
@ -107,9 +101,6 @@ func (db *DB) SetState(path string, accountID int, data []byte, lockID string) (
|
||||||
sql.Named("limit", db.versionsHistoryLimit),
|
sql.Named("limit", db.versionsHistoryLimit),
|
||||||
sql.Named("path", path),
|
sql.Named("path", path),
|
||||||
)
|
)
|
||||||
if err != nil {
|
return err
|
||||||
return false, err
|
})
|
||||||
}
|
|
||||||
err = tx.Commit()
|
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue