cluster.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package gorqlite
  2. /*
  3. this file holds most of the cluster-related stuff:
  4. types:
  5. peer
  6. rqliteCluster
  7. Connection methods:
  8. assembleURL (from a peer)
  9. updateClusterInfo (does the full cluster discovery via status)
  10. */
  11. /* *****************************************************************
  12. imports
  13. * *****************************************************************/
  14. import "bytes"
  15. import "encoding/json"
  16. import "errors"
  17. import "fmt"
  18. import "strings"
  19. //import "os"
  20. //import "reflect"
  21. /* *****************************************************************
  22. type: peer
  23. this is an internal type to abstact peer info.
  24. note that hostname is sometimes used for "has this struct been
  25. inialized" checks.
  26. * *****************************************************************/
  27. type peer struct {
  28. hostname string // hostname or "localhost"
  29. port string // "4001" or port, only ever used as a string
  30. }
  31. func (p *peer) String() string {
  32. return fmt.Sprintf("%s:%s", p.hostname, p.port)
  33. }
  34. /* *****************************************************************
  35. type: rqliteCluster
  36. internal type that abstracts the full cluster state (leader, peers)
  37. * *****************************************************************/
  38. type rqliteCluster struct {
  39. leader peer
  40. otherPeers []peer
  41. conn *Connection
  42. }
  43. /* *****************************************************************
  44. method: rqliteCluster.makePeerList()
  45. in the api calls, we'll want to try the leader first, then the other
  46. peers. to make looping easy, this function returns a list of peers
  47. in the order the try them: leader, other peer, other peer, etc.
  48. * *****************************************************************/
  49. func (rc *rqliteCluster) makePeerList() []peer {
  50. trace("%s: makePeerList() called", rc.conn.ID)
  51. var peerList []peer
  52. peerList = append(peerList, rc.leader)
  53. for _, p := range rc.otherPeers {
  54. peerList = append(peerList, p)
  55. }
  56. trace("%s: makePeerList() returning this list:", rc.conn.ID)
  57. for n, v := range peerList {
  58. trace("%s: makePeerList() peer %d -> %s", rc.conn.ID, n, v.hostname+":"+v.port)
  59. }
  60. return peerList
  61. }
  62. /* *****************************************************************
  63. method: Connection.assembleURL()
  64. tell it what peer to talk to and what kind of API operation you're
  65. making, and it will return the full URL, from start to finish.
  66. e.g.:
  67. https://mary:secret2@server1.example.com:1234/db/query?transaction&level=strong
  68. note: this func needs to live at the Connection level because the
  69. Connection holds the username, password, consistencyLevel, etc.
  70. * *****************************************************************/
  71. func (conn *Connection) assembleURL(apiOp apiOperation, p peer) string {
  72. var stringBuffer bytes.Buffer
  73. if conn.wantsHTTPS == true {
  74. stringBuffer.WriteString("https")
  75. } else {
  76. stringBuffer.WriteString("http")
  77. }
  78. stringBuffer.WriteString("://")
  79. if conn.username != "" && conn.password != "" {
  80. stringBuffer.WriteString(conn.username)
  81. stringBuffer.WriteString(":")
  82. stringBuffer.WriteString(conn.password)
  83. stringBuffer.WriteString("@")
  84. }
  85. stringBuffer.WriteString(p.hostname)
  86. stringBuffer.WriteString(":")
  87. stringBuffer.WriteString(p.port)
  88. switch apiOp {
  89. case api_STATUS:
  90. stringBuffer.WriteString("/status")
  91. case api_QUERY:
  92. stringBuffer.WriteString("/db/query")
  93. case api_WRITE:
  94. stringBuffer.WriteString("/db/execute")
  95. }
  96. if apiOp == api_QUERY || apiOp == api_WRITE {
  97. stringBuffer.WriteString("?timings&transaction&level=")
  98. stringBuffer.WriteString(consistencyLevelNames[conn.consistencyLevel])
  99. }
  100. switch apiOp {
  101. case api_QUERY:
  102. trace("%s: assembled URL for an api_QUERY: %s", conn.ID, stringBuffer.String())
  103. case api_STATUS:
  104. trace("%s: assembled URL for an api_STATUS: %s", conn.ID, stringBuffer.String())
  105. case api_WRITE:
  106. trace("%s: assembled URL for an api_WRITE: %s", conn.ID, stringBuffer.String())
  107. }
  108. return stringBuffer.String()
  109. }
  110. /* *****************************************************************
  111. method: Connection.updateClusterInfo()
  112. upon invocation, updateClusterInfo() completely erases and refreshes
  113. the Connection's cluster info, replacing its rqliteCluster object
  114. with current info.
  115. the web heavy lifting (retrying, etc.) is done in rqliteApiGet()
  116. * *****************************************************************/
  117. func (conn *Connection) updateClusterInfo() error {
  118. trace("%s: updateClusterInfo() called", conn.ID)
  119. // start with a fresh new cluster
  120. var rc rqliteCluster
  121. rc.conn = conn
  122. responseBody, err := conn.rqliteApiGet(api_STATUS)
  123. if err != nil {
  124. return err
  125. }
  126. trace("%s: updateClusterInfo() back from api call OK", conn.ID)
  127. sections := make(map[string]interface{})
  128. err = json.Unmarshal(responseBody, &sections)
  129. if err != nil {
  130. return err
  131. }
  132. sMap := sections["store"].(map[string]interface{})
  133. leaderRaftAddr := sMap["leader"].(string)
  134. trace("%s: leader from store section is %s", conn.ID, leaderRaftAddr)
  135. // leader in this case is the RAFT address
  136. // we want the HTTP address, so we'll use this as
  137. // a key as we sift through APIPeers
  138. meta := sMap["meta"].(map[string]interface{})
  139. apiPeers := meta["APIPeers"].(map[string]interface{})
  140. for raftAddr, httpAddr := range apiPeers {
  141. trace("%s: examining httpAddr %s", conn.ID, httpAddr)
  142. /* httpAddr are usually hostname:port */
  143. var p peer
  144. parts := strings.Split(httpAddr.(string), ":")
  145. p.hostname = parts[0]
  146. p.port = parts[1]
  147. // so is this the leader?
  148. if leaderRaftAddr == raftAddr {
  149. trace("%s: found leader at %s", conn.ID, httpAddr)
  150. rc.leader = p
  151. } else {
  152. rc.otherPeers = append(rc.otherPeers, p)
  153. }
  154. }
  155. if rc.leader.hostname == "" {
  156. return errors.New("could not determine leader from API status call")
  157. }
  158. // dump to trace
  159. trace("%s: here is my cluster config:", conn.ID)
  160. trace("%s: leader : %s", conn.ID, rc.leader.String())
  161. for n, v := range rc.otherPeers {
  162. trace("%s: otherPeer #%d: %s", conn.ID, n, v.String())
  163. }
  164. // now make it official
  165. conn.cluster = rc
  166. return nil
  167. }