conn.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package gorqlite
  2. /*
  3. this file contains some high-level Connection-oriented stuff
  4. */
  5. /* *****************************************************************
  6. imports
  7. * *****************************************************************/
  8. import "errors"
  9. import "fmt"
  10. import "io"
  11. import "net"
  12. import nurl "net/url"
  13. import "strings"
  14. var errClosed = errors.New("gorqlite: connection is closed")
  15. var traceOut io.Writer
  16. // defaults to false. This is used in trace() to quickly
  17. // return if tracing is off, so that we don't do a perhaps
  18. // expensive Sprintf() call only to send it to Discard
  19. var wantsTrace bool
  20. /* *****************************************************************
  21. type: Connection
  22. * *****************************************************************/
  23. /*
  24. The connection abstraction. Note that since rqlite is stateless,
  25. there really is no "connection". However, this type holds
  26. information such as the current leader, peers, connection
  27. string to build URLs, etc.
  28. Connections are assigned a "connection ID" which is a pseudo-UUID
  29. for connection identification in trace output only. This helps
  30. sort out what's going on if you have multiple connections going
  31. at once. It's generated using a non-standards-or-anything-else-compliant
  32. function that uses crypto/rand to generate 16 random bytes.
  33. Note that the Connection objection holds info on all peers, gathered
  34. at time of Open() from the node specified.
  35. */
  36. type Connection struct {
  37. cluster rqliteCluster
  38. /*
  39. name type default
  40. */
  41. username string // username or ""
  42. password string // username or ""
  43. consistencyLevel consistencyLevel // WEAK
  44. wantsHTTPS bool // false unless connection URL is https
  45. // variables below this line need to be initialized in Open()
  46. timeout int // 10
  47. hasBeenClosed bool // false
  48. ID string // generated in init()
  49. }
  50. /* *****************************************************************
  51. method: Connection.Close()
  52. * *****************************************************************/
  53. func (conn *Connection) Close() {
  54. conn.hasBeenClosed = true
  55. trace("%s: %s", conn.ID, "closing connection")
  56. }
  57. /* *****************************************************************
  58. method: Connection.ConsistencyLevel()
  59. * *****************************************************************/
  60. func (conn *Connection) ConsistencyLevel() (string, error) {
  61. if conn.hasBeenClosed {
  62. return "", errClosed
  63. }
  64. return consistencyLevelNames[conn.consistencyLevel], nil
  65. }
  66. /* *****************************************************************
  67. method: Connection.Leader()
  68. * *****************************************************************/
  69. func (conn *Connection) Leader() (string, error) {
  70. if conn.hasBeenClosed {
  71. return "", errClosed
  72. }
  73. trace("%s: Leader(), calling updateClusterInfo()", conn.ID)
  74. err := conn.updateClusterInfo()
  75. if err != nil {
  76. trace("%s: Leader() got error from updateClusterInfo(): %s", conn.ID, err.Error())
  77. return "", err
  78. } else {
  79. trace("%s: Leader(), updateClusterInfo() OK", conn.ID)
  80. }
  81. return conn.cluster.leader.String(), nil
  82. }
  83. /* *****************************************************************
  84. method: Connection.Peers()
  85. * *****************************************************************/
  86. func (conn *Connection) Peers() ([]string, error) {
  87. if conn.hasBeenClosed {
  88. var ans []string
  89. return ans, errClosed
  90. }
  91. plist := make([]string, 0)
  92. trace("%s: Peers(), calling updateClusterInfo()", conn.ID)
  93. err := conn.updateClusterInfo()
  94. if err != nil {
  95. trace("%s: Peers() got error from updateClusterInfo(): %s", conn.ID, err.Error())
  96. return plist, err
  97. } else {
  98. trace("%s: Peers(), updateClusterInfo() OK", conn.ID)
  99. }
  100. plist = append(plist, conn.cluster.leader.String())
  101. for _, p := range conn.cluster.otherPeers {
  102. plist = append(plist, p.String())
  103. }
  104. return plist, nil
  105. }
  106. /* *****************************************************************
  107. method: Connection.SetConsistencyLevel()
  108. * *****************************************************************/
  109. func (conn *Connection) SetConsistencyLevel(levelDesired string) error {
  110. if conn.hasBeenClosed {
  111. return errClosed
  112. }
  113. _, ok := consistencyLevels[levelDesired]
  114. if ok {
  115. conn.consistencyLevel = consistencyLevels[levelDesired]
  116. return nil
  117. }
  118. return errors.New(fmt.Sprintf("unknown consistency level: %s", levelDesired))
  119. }
  120. /* *****************************************************************
  121. method: Connection.initConnection()
  122. * *****************************************************************/
  123. /*
  124. initConnection takes the initial connection URL specified by
  125. the user, and parses it into a peer. This peer is assumed to
  126. be the leader. The next thing Open() does is updateClusterInfo()
  127. so the truth will be revealed soon enough.
  128. initConnection() does not talk to rqlite. It only parses the
  129. connection URL and prepares the new connection for work.
  130. URL format:
  131. http[s]://${USER}:${PASSWORD}@${HOSTNAME}:${PORT}/db?[OPTIONS]
  132. Examples:
  133. https://mary:secret2@localhost:4001/db
  134. https://mary:secret2@server1.example.com:4001/db?level=none
  135. https://mary:secret2@server2.example.com:4001/db?level=weak
  136. https://mary:secret2@localhost:2265/db?level=strong
  137. to use default connection to localhost:4001 with no auth:
  138. http://
  139. https://
  140. guaranteed map fields - will be set to "" if not specified
  141. field name default if not specified
  142. username ""
  143. password ""
  144. hostname "localhost"
  145. port "4001"
  146. consistencyLevel "weak"
  147. */
  148. func (conn *Connection) initConnection(url string) error {
  149. // do some sanity checks. You know users.
  150. if len(url) < 7 {
  151. return errors.New("url specified is impossibly short")
  152. }
  153. if strings.HasPrefix(url, "http") == false {
  154. return errors.New("url does not start with 'http'")
  155. }
  156. u, err := nurl.Parse(url)
  157. if err != nil {
  158. return err
  159. }
  160. trace("%s: net.url.Parse() OK", conn.ID)
  161. if u.Scheme == "https" {
  162. conn.wantsHTTPS = true
  163. }
  164. // specs say Username() is always populated even if empty
  165. if u.User == nil {
  166. conn.username = ""
  167. conn.password = ""
  168. } else {
  169. // guaranteed, but could be empty which is ok
  170. conn.username = u.User.Username()
  171. // not guaranteed, so test if set
  172. pass, isset := u.User.Password()
  173. if isset {
  174. conn.password = pass
  175. } else {
  176. conn.password = ""
  177. }
  178. }
  179. if u.Host == "" {
  180. conn.cluster.leader.hostname = "localhost"
  181. } else {
  182. conn.cluster.leader.hostname = u.Host
  183. }
  184. if u.Host == "" {
  185. conn.cluster.leader.hostname = "localhost"
  186. conn.cluster.leader.port = "4001"
  187. } else {
  188. // SplitHostPort() should only return an error if there is no host port.
  189. // I think.
  190. h, p, err := net.SplitHostPort(u.Host)
  191. if err != nil {
  192. conn.cluster.leader.hostname = u.Host
  193. } else {
  194. conn.cluster.leader.hostname = h
  195. conn.cluster.leader.port = p
  196. }
  197. }
  198. /*
  199. at the moment, the only allowed query is "level=" with
  200. the desired consistency level
  201. */
  202. // default
  203. conn.consistencyLevel = cl_WEAK
  204. if u.RawQuery != "" {
  205. if u.RawQuery == "level=weak" {
  206. // that's ok but nothing to do
  207. } else if u.RawQuery == "level=strong" {
  208. conn.consistencyLevel = cl_STRONG
  209. } else if u.RawQuery == "level=none" { // the fools!
  210. conn.consistencyLevel = cl_NONE
  211. } else {
  212. return errors.New("don't know what to do with this query: " + u.RawQuery)
  213. }
  214. }
  215. trace("%s: parseDefaultPeer() is done:", conn.ID)
  216. if conn.wantsHTTPS == true {
  217. trace("%s: %s -> %s", conn.ID, "wants https?", "yes")
  218. } else {
  219. trace("%s: %s -> %s", conn.ID, "wants https?", "no")
  220. }
  221. trace("%s: %s -> %s", conn.ID, "username", conn.username)
  222. trace("%s: %s -> %s", conn.ID, "password", conn.password)
  223. trace("%s: %s -> %s", conn.ID, "hostname", conn.cluster.leader.hostname)
  224. trace("%s: %s -> %s", conn.ID, "port", conn.cluster.leader.port)
  225. trace("%s: %s -> %s", conn.ID, "consistencyLevel", consistencyLevelNames[conn.consistencyLevel])
  226. conn.cluster.conn = conn
  227. return nil
  228. }